Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::runtime::{RdsRuntime, RuntimeError};
16use crate::state::{
17    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
18    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
19    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
23
24fn is_mutating_action(action: &str) -> bool {
25    if matches!(
26        action,
27        "AddTagsToResource"
28            | "CreateDBInstance"
29            | "CreateDBInstanceReadReplica"
30            | "CreateDBParameterGroup"
31            | "CreateDBSnapshot"
32            | "CreateDBSubnetGroup"
33            | "DeleteDBInstance"
34            | "DeleteDBParameterGroup"
35            | "DeleteDBSnapshot"
36            | "DeleteDBSubnetGroup"
37            | "ModifyDBInstance"
38            | "ModifyDBParameterGroup"
39            | "ModifyDBSubnetGroup"
40            | "RebootDBInstance"
41            | "RemoveTagsFromResource"
42            | "RestoreDBInstanceFromDBSnapshot"
43    ) {
44        return true;
45    }
46    // Heuristic for the 140 extra ops: any verb that mutates state.
47    let mutating_prefixes = [
48        "Create",
49        "Modify",
50        "Delete",
51        "Reboot",
52        "Start",
53        "Stop",
54        "Failover",
55        "Switchover",
56        "Promote",
57        "Reset",
58        "Apply",
59        "Authorize",
60        "Revoke",
61        "Add",
62        "Remove",
63        "Register",
64        "Deregister",
65        "Copy",
66        "Restore",
67        "Backtrack",
68        "Cancel",
69        "Purchase",
70        "Disable",
71        "Enable",
72    ];
73    mutating_prefixes.iter().any(|p| action.starts_with(p))
74}
75const SUPPORTED_ACTIONS: &[&str] = &[
76    "AddRoleToDBCluster",
77    "AddRoleToDBInstance",
78    "AddSourceIdentifierToSubscription",
79    "AddTagsToResource",
80    "ApplyPendingMaintenanceAction",
81    "AuthorizeDBSecurityGroupIngress",
82    "BacktrackDBCluster",
83    "CancelExportTask",
84    "CopyDBClusterParameterGroup",
85    "CopyDBClusterSnapshot",
86    "CopyDBParameterGroup",
87    "CopyDBSnapshot",
88    "CopyOptionGroup",
89    "CreateBlueGreenDeployment",
90    "CreateCustomDBEngineVersion",
91    "CreateDBCluster",
92    "CreateDBClusterEndpoint",
93    "CreateDBClusterParameterGroup",
94    "CreateDBClusterSnapshot",
95    "CreateDBInstance",
96    "CreateDBInstanceReadReplica",
97    "CreateDBParameterGroup",
98    "CreateDBProxy",
99    "CreateDBProxyEndpoint",
100    "CreateDBSecurityGroup",
101    "CreateDBShardGroup",
102    "CreateDBSnapshot",
103    "CreateDBSubnetGroup",
104    "CreateEventSubscription",
105    "CreateGlobalCluster",
106    "CreateIntegration",
107    "CreateOptionGroup",
108    "CreateTenantDatabase",
109    "DeleteBlueGreenDeployment",
110    "DeleteCustomDBEngineVersion",
111    "DeleteDBCluster",
112    "DeleteDBClusterAutomatedBackup",
113    "DeleteDBClusterEndpoint",
114    "DeleteDBClusterParameterGroup",
115    "DeleteDBClusterSnapshot",
116    "DeleteDBInstance",
117    "DeleteDBInstanceAutomatedBackup",
118    "DeleteDBParameterGroup",
119    "DeleteDBProxy",
120    "DeleteDBProxyEndpoint",
121    "DeleteDBSecurityGroup",
122    "DeleteDBShardGroup",
123    "DeleteDBSnapshot",
124    "DeleteDBSubnetGroup",
125    "DeleteEventSubscription",
126    "DeleteGlobalCluster",
127    "DeleteIntegration",
128    "DeleteOptionGroup",
129    "DeleteTenantDatabase",
130    "DeregisterDBProxyTargets",
131    "DescribeAccountAttributes",
132    "DescribeBlueGreenDeployments",
133    "DescribeCertificates",
134    "DescribeDBClusterAutomatedBackups",
135    "DescribeDBClusterBacktracks",
136    "DescribeDBClusterEndpoints",
137    "DescribeDBClusterParameterGroups",
138    "DescribeDBClusterParameters",
139    "DescribeDBClusterSnapshotAttributes",
140    "DescribeDBClusterSnapshots",
141    "DescribeDBClusters",
142    "DescribeDBEngineVersions",
143    "DescribeDBInstanceAutomatedBackups",
144    "DescribeDBInstances",
145    "DescribeDBLogFiles",
146    "DescribeDBMajorEngineVersions",
147    "DescribeDBParameterGroups",
148    "DescribeDBParameters",
149    "DescribeDBProxies",
150    "DescribeDBProxyEndpoints",
151    "DescribeDBProxyTargetGroups",
152    "DescribeDBProxyTargets",
153    "DescribeDBRecommendations",
154    "DescribeDBSecurityGroups",
155    "DescribeDBShardGroups",
156    "DescribeDBSnapshotAttributes",
157    "DescribeDBSnapshotTenantDatabases",
158    "DescribeDBSnapshots",
159    "DescribeDBSubnetGroups",
160    "DescribeEngineDefaultClusterParameters",
161    "DescribeEngineDefaultParameters",
162    "DescribeEventCategories",
163    "DescribeEventSubscriptions",
164    "DescribeEvents",
165    "DescribeExportTasks",
166    "DescribeGlobalClusters",
167    "DescribeIntegrations",
168    "DescribeOptionGroupOptions",
169    "DescribeOptionGroups",
170    "DescribeOrderableDBInstanceOptions",
171    "DescribePendingMaintenanceActions",
172    "DescribeReservedDBInstances",
173    "DescribeReservedDBInstancesOfferings",
174    "DescribeSourceRegions",
175    "DescribeTenantDatabases",
176    "DescribeValidDBInstanceModifications",
177    "DisableHttpEndpoint",
178    "DownloadDBLogFilePortion",
179    "EnableHttpEndpoint",
180    "FailoverDBCluster",
181    "FailoverGlobalCluster",
182    "ListTagsForResource",
183    "ModifyActivityStream",
184    "ModifyCertificates",
185    "ModifyCurrentDBClusterCapacity",
186    "ModifyCustomDBEngineVersion",
187    "ModifyDBCluster",
188    "ModifyDBClusterEndpoint",
189    "ModifyDBClusterParameterGroup",
190    "ModifyDBClusterSnapshotAttribute",
191    "ModifyDBInstance",
192    "ModifyDBParameterGroup",
193    "ModifyDBProxy",
194    "ModifyDBProxyEndpoint",
195    "ModifyDBProxyTargetGroup",
196    "ModifyDBRecommendation",
197    "ModifyDBShardGroup",
198    "ModifyDBSnapshot",
199    "ModifyDBSnapshotAttribute",
200    "ModifyDBSubnetGroup",
201    "ModifyEventSubscription",
202    "ModifyGlobalCluster",
203    "ModifyIntegration",
204    "ModifyOptionGroup",
205    "ModifyTenantDatabase",
206    "PromoteReadReplica",
207    "PromoteReadReplicaDBCluster",
208    "PurchaseReservedDBInstancesOffering",
209    "RebootDBCluster",
210    "RebootDBInstance",
211    "RebootDBShardGroup",
212    "RegisterDBProxyTargets",
213    "RemoveFromGlobalCluster",
214    "RemoveRoleFromDBCluster",
215    "RemoveRoleFromDBInstance",
216    "RemoveSourceIdentifierFromSubscription",
217    "RemoveTagsFromResource",
218    "ResetDBClusterParameterGroup",
219    "ResetDBParameterGroup",
220    "RestoreDBClusterFromS3",
221    "RestoreDBClusterFromSnapshot",
222    "RestoreDBClusterToPointInTime",
223    "RestoreDBInstanceFromDBSnapshot",
224    "RestoreDBInstanceFromS3",
225    "RestoreDBInstanceToPointInTime",
226    "RevokeDBSecurityGroupIngress",
227    "StartActivityStream",
228    "StartDBCluster",
229    "StartDBInstance",
230    "StartDBInstanceAutomatedBackupsReplication",
231    "StartExportTask",
232    "StopActivityStream",
233    "StopDBCluster",
234    "StopDBInstance",
235    "StopDBInstanceAutomatedBackupsReplication",
236    "SwitchoverBlueGreenDeployment",
237    "SwitchoverGlobalCluster",
238    "SwitchoverReadReplica",
239];
240
241pub struct RdsService {
242    pub(crate) state: SharedRdsState,
243    runtime: Option<Arc<RdsRuntime>>,
244    snapshot_store: Option<Arc<dyn SnapshotStore>>,
245    snapshot_lock: Arc<AsyncMutex<()>>,
246    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
247}
248
249/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
250#[derive(Clone, Copy)]
251#[allow(dead_code, clippy::enum_variant_names)]
252pub(crate) enum RdsSourceType {
253    DbInstance,
254    DbSnapshot,
255    DbParameterGroup,
256}
257
258impl RdsSourceType {
259    fn as_str(self) -> &'static str {
260        match self {
261            Self::DbInstance => "DB_INSTANCE",
262            Self::DbSnapshot => "DB_SNAPSHOT",
263            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
264        }
265    }
266
267    fn detail_type(self) -> &'static str {
268        match self {
269            Self::DbInstance => "RDS DB Instance Event",
270            Self::DbSnapshot => "RDS DB Snapshot Event",
271            Self::DbParameterGroup => "RDS DB Parameter Group Event",
272        }
273    }
274}
275
276impl RdsService {
277    pub(crate) fn state_handle(&self) -> &SharedRdsState {
278        &self.state
279    }
280}
281
282impl RdsService {
283    pub fn new(state: SharedRdsState) -> Self {
284        Self {
285            state,
286            runtime: None,
287            snapshot_store: None,
288            snapshot_lock: Arc::new(AsyncMutex::new(())),
289            delivery_bus: None,
290        }
291    }
292
293    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
294        self.runtime = Some(runtime);
295        self
296    }
297
298    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
299        self.snapshot_store = Some(store);
300        self
301    }
302
303    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
304        self.delivery_bus = Some(bus);
305        self
306    }
307
308    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
309    /// No-op when the delivery bus isn't wired (tests, minimal configs).
310    pub(crate) fn emit_event(
311        &self,
312        source_type: RdsSourceType,
313        source_identifier: &str,
314        source_arn: &str,
315        event_id: &str,
316        event_categories: &[&str],
317        message: &str,
318    ) {
319        emit_event_static(
320            self.delivery_bus.as_ref(),
321            source_type,
322            source_identifier,
323            source_arn,
324            event_id,
325            event_categories,
326            message,
327        );
328    }
329
330    async fn save_snapshot(&self) {
331        let Some(store) = self.snapshot_store.clone() else {
332            return;
333        };
334        let _guard = self.snapshot_lock.lock().await;
335        let snapshot = RdsSnapshot {
336            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
337            state: None,
338            accounts: Some(self.state.read().clone()),
339        };
340        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
341            let bytes = serde_json::to_vec(&snapshot)
342                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
343            store.save(&bytes)
344        })
345        .await;
346        match join {
347            Ok(Ok(())) => {}
348            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
349            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
350        }
351    }
352
353    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
354    ///
355    /// RDS operations that start, stop, or reach into a database container fail
356    /// with a consistent wire error when the daemon (Docker/Podman) is missing
357    /// rather than each caller restating the message.
358    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
359        self.runtime.as_ref().ok_or_else(|| {
360            AwsServiceError::aws_error(
361                StatusCode::SERVICE_UNAVAILABLE,
362                "InvalidParameterValue",
363                "Docker/Podman is required for RDS DB instances but is not available",
364            )
365        })
366    }
367}
368
369#[async_trait]
370impl AwsService for RdsService {
371    fn service_name(&self) -> &str {
372        "rds"
373    }
374
375    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
376        let mutates = is_mutating_action(request.action.as_str());
377        let result = match request.action.as_str() {
378            "AddTagsToResource" => self.add_tags_to_resource(&request),
379            "CreateDBInstance" => self.create_db_instance(&request).await,
380            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
381            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
382            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
383            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
384            "DeleteDBInstance" => self.delete_db_instance(&request).await,
385            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
386            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
387            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
388            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
389            "DescribeDBInstances" => self.describe_db_instances(&request),
390            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
391            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
392            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
393            "DescribeOrderableDBInstanceOptions" => {
394                self.describe_orderable_db_instance_options(&request)
395            }
396            "ListTagsForResource" => self.list_tags_for_resource(&request),
397            "ModifyDBInstance" => self.modify_db_instance(&request),
398            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
399            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
400            "RebootDBInstance" => self.reboot_db_instance(&request).await,
401            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
402            "RestoreDBInstanceFromDBSnapshot" => {
403                self.restore_db_instance_from_db_snapshot(&request).await
404            }
405            _ => self.handle_extra_action(&request),
406        };
407        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
408            self.save_snapshot().await;
409        }
410        result
411    }
412
413    fn supported_actions(&self) -> &[&str] {
414        SUPPORTED_ACTIONS
415    }
416}
417
418impl RdsService {
419    async fn create_db_instance(
420        &self,
421        request: &AwsRequest,
422    ) -> Result<AwsResponse, AwsServiceError> {
423        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
424        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
425        let db_instance_class = required_param(request, "DBInstanceClass")?;
426        let engine = required_param(request, "Engine")?;
427        let master_username = required_param(request, "MasterUsername")?;
428        let master_user_password = required_param(request, "MasterUserPassword")?;
429        let db_name = optional_param(request, "DBName");
430        let engine_version =
431            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
432        let publicly_accessible =
433            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
434                .unwrap_or(true);
435        let deletion_protection =
436            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
437                .unwrap_or(false);
438        let port = optional_i32_param(request, "Port")?
439            .unwrap_or_else(|| default_port_for_engine(&engine));
440        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
441
442        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
443            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
444
445        let backup_retention_period =
446            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
447        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
448            .unwrap_or_else(|| "03:00-04:00".to_string());
449        let option_group_name = optional_param(request, "OptionGroupName");
450        let multi_az =
451            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
452
453        validate_create_request(
454            &db_instance_identifier,
455            allocated_storage,
456            &db_instance_class,
457            &engine,
458            &engine_version,
459            port,
460        )?;
461
462        {
463            let mut accounts = self.state.write();
464            let state = accounts.get_or_create(&request.account_id);
465            if !state.begin_instance_creation(&db_instance_identifier) {
466                return Err(AwsServiceError::aws_error(
467                    StatusCode::BAD_REQUEST,
468                    "DBInstanceAlreadyExists",
469                    format!("DBInstance {} already exists.", db_instance_identifier),
470                ));
471            }
472            // Validate parameter group exists if specified by the caller
473            if let Some(ref pg_name) = db_parameter_group_name {
474                if !state.parameter_groups.contains_key(pg_name) {
475                    state.cancel_instance_creation(&db_instance_identifier);
476                    return Err(AwsServiceError::aws_error(
477                        StatusCode::NOT_FOUND,
478                        "DBParameterGroupNotFound",
479                        format!("DBParameterGroup {} not found.", pg_name),
480                    ));
481                }
482            }
483        }
484
485        let runtime = self.require_runtime()?.clone();
486
487        let logical_db_name = db_name
488            .clone()
489            .unwrap_or_else(|| default_db_name(&engine).to_string());
490
491        // Insert a "creating" placeholder synchronously and spawn the
492        // container start in a background task. CreateDBInstance returns
493        // ~immediately; DescribeDBInstances flips to "available" (or
494        // "failed") when the container is up. Matches AWS RDS behavior:
495        // CreateDBInstance never blocks on the container coming up.
496        let created_at = Utc::now();
497        let instance = {
498            let mut accounts = self.state.write();
499            let state = accounts.get_or_create(&request.account_id);
500            let placeholder = DbInstance {
501                db_instance_identifier: db_instance_identifier.clone(),
502                db_instance_arn: state.db_instance_arn(&db_instance_identifier),
503                db_instance_class: db_instance_class.clone(),
504                engine: engine.clone(),
505                engine_version: engine_version.clone(),
506                db_instance_status: "creating".to_string(),
507                master_username: master_username.clone(),
508                db_name: db_name.clone(),
509                endpoint_address: String::new(),
510                port: 0,
511                allocated_storage,
512                publicly_accessible,
513                deletion_protection,
514                created_at,
515                dbi_resource_id: state.next_dbi_resource_id(),
516                master_user_password: master_user_password.clone(),
517                container_id: String::new(),
518                host_port: 0,
519                tags: Vec::new(),
520                read_replica_source_db_instance_identifier: None,
521                read_replica_db_instance_identifiers: Vec::new(),
522                vpc_security_group_ids,
523                db_parameter_group_name,
524                backup_retention_period,
525                preferred_backup_window,
526                latest_restorable_time: if backup_retention_period > 0 {
527                    Some(created_at)
528                } else {
529                    None
530                },
531                option_group_name,
532                multi_az,
533                pending_modified_values: None,
534            };
535            state.finish_instance_creation(placeholder.clone());
536            placeholder
537        };
538        let instance_arn = instance.db_instance_arn.clone();
539
540        self.emit_event(
541            RdsSourceType::DbInstance,
542            &db_instance_identifier,
543            &instance_arn,
544            "RDS-EVENT-0005",
545            &["creation"],
546            "DB instance created",
547        );
548
549        {
550            let state_handle = self.state.clone();
551            let delivery_bus = self.delivery_bus.clone();
552            let runtime = runtime.clone();
553            let id = db_instance_identifier.clone();
554            let engine = engine.clone();
555            let engine_version = engine_version.clone();
556            let master_username = master_username.clone();
557            let master_user_password = master_user_password.clone();
558            let account_id = request.account_id.clone();
559            let region = request.region.clone();
560            let arn = instance_arn.clone();
561            tokio::spawn(async move {
562                match runtime
563                    .ensure_postgres(
564                        &id,
565                        &engine,
566                        &engine_version,
567                        &master_username,
568                        &master_user_password,
569                        &logical_db_name,
570                        &account_id,
571                        &region,
572                    )
573                    .await
574                {
575                    Ok(running) => {
576                        let mut accounts = state_handle.write();
577                        let state = accounts.get_or_create(&account_id);
578                        if let Some(inst) = state.instances.get_mut(&id) {
579                            inst.db_instance_status = "available".to_string();
580                            inst.endpoint_address = "127.0.0.1".to_string();
581                            inst.port = i32::from(running.host_port);
582                            inst.host_port = running.host_port;
583                            inst.container_id = running.container_id;
584                        }
585                    }
586                    Err(error) => {
587                        tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
588                        {
589                            let mut accounts = state_handle.write();
590                            let state = accounts.get_or_create(&account_id);
591                            state.instances.remove(&id);
592                        }
593                        emit_event_static(
594                            delivery_bus.as_ref(),
595                            RdsSourceType::DbInstance,
596                            &id,
597                            &arn,
598                            "RDS-EVENT-0058",
599                            &["failure"],
600                            &format!("DB instance failed to create: {}", error),
601                        );
602                    }
603                }
604            });
605        }
606
607        Ok(AwsResponse::xml(
608            StatusCode::OK,
609            xml_wrap(
610                "CreateDBInstance",
611                &format!(
612                    "<DBInstance>{}</DBInstance>",
613                    db_instance_xml(&instance, None)
614                ),
615                &request.request_id,
616            ),
617        ))
618    }
619
620    async fn delete_db_instance(
621        &self,
622        request: &AwsRequest,
623    ) -> Result<AwsResponse, AwsServiceError> {
624        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
625        let skip_final_snapshot =
626            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
627                .unwrap_or(false);
628        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
629
630        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
631            return Err(AwsServiceError::aws_error(
632                StatusCode::BAD_REQUEST,
633                "InvalidParameterCombination",
634                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
635            ));
636        }
637        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
638            return Err(AwsServiceError::aws_error(
639                StatusCode::BAD_REQUEST,
640                "InvalidParameterCombination",
641                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
642            ));
643        }
644
645        // Check deletion protection BEFORE creating snapshot or making any changes
646        {
647            let accounts = self.state.read();
648            let empty = RdsState::new(&request.account_id, &request.region);
649            let state = accounts.get(&request.account_id).unwrap_or(&empty);
650            if let Some(instance) = state.instances.get(&db_instance_identifier) {
651                if instance.deletion_protection {
652                    return Err(AwsServiceError::aws_error(
653                        StatusCode::BAD_REQUEST,
654                        "InvalidDBInstanceState",
655                        format!(
656                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
657                            db_instance_identifier
658                        ),
659                    ));
660                }
661            } else {
662                return Err(db_instance_not_found(&db_instance_identifier));
663            }
664        }
665
666        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
667            self.create_final_db_snapshot(
668                &db_instance_identifier,
669                snapshot_id,
670                &request.account_id,
671                &request.region,
672            )
673            .await?;
674        }
675
676        let instance = {
677            let mut accounts = self.state.write();
678            let state = accounts.get_or_create(&request.account_id);
679            let instance = state
680                .instances
681                .remove(&db_instance_identifier)
682                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
683
684            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
685                if let Some(source) = state.instances.get_mut(source_id) {
686                    source
687                        .read_replica_db_instance_identifiers
688                        .retain(|id| id != &db_instance_identifier);
689                }
690            }
691
692            for replica_id in &instance.read_replica_db_instance_identifiers {
693                if let Some(replica) = state.instances.get_mut(replica_id) {
694                    replica.read_replica_source_db_instance_identifier = None;
695                }
696            }
697
698            instance
699        };
700
701        if let Some(runtime) = &self.runtime {
702            runtime.stop_container(&db_instance_identifier).await;
703        }
704
705        self.emit_event(
706            RdsSourceType::DbInstance,
707            &db_instance_identifier,
708            &instance.db_instance_arn,
709            "RDS-EVENT-0003",
710            &["deletion"],
711            "DB instance deleted",
712        );
713
714        Ok(AwsResponse::xml(
715            StatusCode::OK,
716            xml_wrap(
717                "DeleteDBInstance",
718                &format!(
719                    "<DBInstance>{}</DBInstance>",
720                    db_instance_xml(&instance, Some("deleting"))
721                ),
722                &request.request_id,
723            ),
724        ))
725    }
726
727    /// Take a final snapshot of an instance that is about to be deleted,
728    /// persisting the dumped database into `state.snapshots`. The DLQ-style
729    /// conflict check runs twice — once under the read lock before paying
730    /// for the dump, once under the write lock before committing — to keep
731    /// concurrent deletes from colliding.
732    async fn create_final_db_snapshot(
733        &self,
734        db_instance_identifier: &str,
735        snapshot_id: &str,
736        account_id: &str,
737        region: &str,
738    ) -> Result<(), AwsServiceError> {
739        let runtime = self.runtime.as_ref().ok_or_else(|| {
740            AwsServiceError::aws_error(
741                StatusCode::SERVICE_UNAVAILABLE,
742                "InvalidParameterValue",
743                "Docker/Podman is required for RDS snapshots but is not available",
744            )
745        })?;
746
747        let (instance_for_snapshot, db_name) = {
748            let accounts = self.state.read();
749            let empty = RdsState::new(account_id, region);
750            let state = accounts.get(account_id).unwrap_or(&empty);
751
752            if state.snapshots.contains_key(snapshot_id) {
753                return Err(AwsServiceError::aws_error(
754                    StatusCode::CONFLICT,
755                    "DBSnapshotAlreadyExists",
756                    format!("DBSnapshot {snapshot_id} already exists."),
757                ));
758            }
759
760            let instance = state
761                .instances
762                .get(db_instance_identifier)
763                .cloned()
764                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
765
766            let default_db = default_db_name(&instance.engine);
767            let db_name = instance
768                .db_name
769                .as_deref()
770                .unwrap_or(default_db)
771                .to_string();
772
773            (instance, db_name)
774        };
775
776        let dump_data = runtime
777            .dump_database(
778                db_instance_identifier,
779                &instance_for_snapshot.engine,
780                &instance_for_snapshot.master_username,
781                &instance_for_snapshot.master_user_password,
782                &db_name,
783            )
784            .await
785            .map_err(runtime_error_to_service_error)?;
786
787        let mut accounts = self.state.write();
788        let state = accounts.get_or_create(account_id);
789
790        if state.snapshots.contains_key(snapshot_id) {
791            return Err(AwsServiceError::aws_error(
792                StatusCode::CONFLICT,
793                "DBSnapshotAlreadyExists",
794                format!("DBSnapshot {snapshot_id} already exists."),
795            ));
796        }
797
798        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
799
800        let snapshot = DbSnapshot {
801            db_snapshot_identifier: snapshot_id.to_string(),
802            db_snapshot_arn: snapshot_arn,
803            db_instance_identifier: db_instance_identifier.to_string(),
804            snapshot_create_time: Utc::now(),
805            engine: instance_for_snapshot.engine.clone(),
806            engine_version: instance_for_snapshot.engine_version.clone(),
807            allocated_storage: instance_for_snapshot.allocated_storage,
808            status: "available".to_string(),
809            port: instance_for_snapshot.port,
810            master_username: instance_for_snapshot.master_username.clone(),
811            db_name: instance_for_snapshot.db_name.clone(),
812            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
813            snapshot_type: "manual".to_string(),
814            master_user_password: instance_for_snapshot.master_user_password.clone(),
815            tags: Vec::new(),
816            dump_data,
817        };
818
819        state.snapshots.insert(snapshot_id.to_string(), snapshot);
820        Ok(())
821    }
822
823    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
825        let db_instance_class = optional_param(request, "DBInstanceClass");
826        let deletion_protection =
827            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
828        let apply_immediately =
829            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
830
831        // Parse VPC security group IDs - only if at least one is provided
832        let vpc_security_group_ids = {
833            let mut ids = Vec::new();
834            for index in 1.. {
835                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
836                match optional_param(request, &sg_id_name) {
837                    Some(sg_id) => ids.push(sg_id),
838                    None => break,
839                }
840            }
841            if ids.is_empty() {
842                None
843            } else {
844                Some(ids)
845            }
846        };
847
848        if db_instance_class.is_none()
849            && deletion_protection.is_none()
850            && vpc_security_group_ids.is_none()
851        {
852            return Err(AwsServiceError::aws_error(
853                StatusCode::BAD_REQUEST,
854                "InvalidParameterCombination",
855                "At least one supported mutable field must be provided.",
856            ));
857        }
858        if let Some(ref class) = db_instance_class {
859            validate_db_instance_class(class)?;
860        }
861
862        let mut accounts = self.state.write();
863        let state = accounts.get_or_create(&request.account_id);
864        let instance = state
865            .instances
866            .get_mut(&db_instance_identifier)
867            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
868
869        // If ApplyImmediately is false, stage changes as pending
870        if apply_immediately == Some(false) {
871            let pending = instance
872                .pending_modified_values
873                .get_or_insert(Default::default());
874            if let Some(class) = db_instance_class {
875                pending.db_instance_class = Some(class);
876            }
877            // Note: deletion_protection and vpc_security_group_ids are applied immediately
878            // regardless of ApplyImmediately flag (per AWS behavior)
879            if let Some(deletion_protection) = deletion_protection {
880                instance.deletion_protection = deletion_protection;
881            }
882            if let Some(security_group_ids) = vpc_security_group_ids {
883                instance.vpc_security_group_ids = security_group_ids;
884            }
885        } else {
886            // Apply immediately (default behavior)
887            if let Some(class) = db_instance_class {
888                instance.db_instance_class = class;
889            }
890            if let Some(deletion_protection) = deletion_protection {
891                instance.deletion_protection = deletion_protection;
892            }
893            if let Some(security_group_ids) = vpc_security_group_ids {
894                instance.vpc_security_group_ids = security_group_ids;
895            }
896        }
897        let instance_arn = instance.db_instance_arn.clone();
898        let xml = xml_wrap(
899            "ModifyDBInstance",
900            &format!(
901                "<DBInstance>{}</DBInstance>",
902                db_instance_xml(instance, Some("modifying"))
903            ),
904            &request.request_id,
905        );
906        drop(accounts);
907
908        self.emit_event(
909            RdsSourceType::DbInstance,
910            &db_instance_identifier,
911            &instance_arn,
912            "RDS-EVENT-0014",
913            &["configuration change"],
914            "DB instance was modified",
915        );
916
917        Ok(AwsResponse::xml(StatusCode::OK, xml))
918    }
919
920    async fn reboot_db_instance(
921        &self,
922        request: &AwsRequest,
923    ) -> Result<AwsResponse, AwsServiceError> {
924        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
925        let force_failover =
926            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
927        if force_failover == Some(true) {
928            return Err(AwsServiceError::aws_error(
929                StatusCode::BAD_REQUEST,
930                "InvalidParameterCombination",
931                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
932            ));
933        }
934
935        let instance = {
936            let accounts = self.state.read();
937            let empty = RdsState::new(&request.account_id, &request.region);
938            let state = accounts.get(&request.account_id).unwrap_or(&empty);
939            state
940                .instances
941                .get(&db_instance_identifier)
942                .cloned()
943                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
944        };
945
946        let runtime = self.require_runtime()?;
947
948        let running = runtime
949            .restart_container(
950                &db_instance_identifier,
951                &instance.engine,
952                &instance.master_username,
953                &instance.master_user_password,
954                instance
955                    .db_name
956                    .as_deref()
957                    .unwrap_or(default_db_name(&instance.engine)),
958            )
959            .await
960            .map_err(runtime_error_to_service_error)?;
961
962        let instance = {
963            let mut accounts = self.state.write();
964            let state = accounts.get_or_create(&request.account_id);
965            let instance = state
966                .instances
967                .get_mut(&db_instance_identifier)
968                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
969            instance.host_port = running.host_port;
970            instance.port = i32::from(running.host_port);
971
972            // Apply any pending modifications
973            if let Some(pending) = instance.pending_modified_values.take() {
974                if let Some(class) = pending.db_instance_class {
975                    instance.db_instance_class = class;
976                }
977                if let Some(allocated_storage) = pending.allocated_storage {
978                    instance.allocated_storage = allocated_storage;
979                }
980                if let Some(backup_retention_period) = pending.backup_retention_period {
981                    instance.backup_retention_period = backup_retention_period;
982                }
983                if let Some(multi_az) = pending.multi_az {
984                    instance.multi_az = multi_az;
985                }
986                if let Some(engine_version) = pending.engine_version {
987                    instance.engine_version = engine_version;
988                }
989                if let Some(master_user_password) = pending.master_user_password {
990                    instance.master_user_password = master_user_password;
991                }
992            }
993
994            instance.clone()
995        };
996
997        self.emit_event(
998            RdsSourceType::DbInstance,
999            &db_instance_identifier,
1000            &instance.db_instance_arn,
1001            "RDS-EVENT-0006",
1002            &["availability"],
1003            "DB instance restarted",
1004        );
1005
1006        Ok(AwsResponse::xml(
1007            StatusCode::OK,
1008            xml_wrap(
1009                "RebootDBInstance",
1010                &format!(
1011                    "<DBInstance>{}</DBInstance>",
1012                    db_instance_xml(&instance, Some("rebooting"))
1013                ),
1014                &request.request_id,
1015            ),
1016        ))
1017    }
1018
1019    fn describe_db_engine_versions(
1020        &self,
1021        request: &AwsRequest,
1022    ) -> Result<AwsResponse, AwsServiceError> {
1023        let engine = optional_param(request, "Engine");
1024        let engine_version = optional_param(request, "EngineVersion");
1025        let family = optional_param(request, "DBParameterGroupFamily");
1026        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
1027
1028        let mut versions = filter_engine_versions(
1029            &default_engine_versions(),
1030            &engine,
1031            &engine_version,
1032            &family,
1033        );
1034
1035        if default_only.unwrap_or(false) {
1036            versions.truncate(1);
1037        }
1038
1039        Ok(AwsResponse::xml(
1040            StatusCode::OK,
1041            xml_wrap(
1042                "DescribeDBEngineVersions",
1043                &format!(
1044                    "<DBEngineVersions>{}</DBEngineVersions>",
1045                    versions.iter().map(engine_version_xml).collect::<String>()
1046                ),
1047                &request.request_id,
1048            ),
1049        ))
1050    }
1051
1052    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1053        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1054        let marker = optional_param(request, "Marker");
1055        let max_records = optional_param(request, "MaxRecords");
1056
1057        let accounts = self.state.read();
1058        let empty = RdsState::new(&request.account_id, &request.region);
1059        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1060
1061        // If specific identifier requested, return just that one (no pagination)
1062        if let Some(identifier) = db_instance_identifier {
1063            let instance = state
1064                .instances
1065                .get(&identifier)
1066                .cloned()
1067                .ok_or_else(|| db_instance_not_found(&identifier))?;
1068
1069            return Ok(AwsResponse::xml(
1070                StatusCode::OK,
1071                xml_wrap(
1072                    "DescribeDBInstances",
1073                    &format!(
1074                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1075                        db_instance_xml(&instance, None)
1076                    ),
1077                    &request.request_id,
1078                ),
1079            ));
1080        }
1081
1082        // Get all instances sorted by created_at, then identifier
1083        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1084        instances.sort_by(|a, b| {
1085            a.created_at
1086                .cmp(&b.created_at)
1087                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1088        });
1089
1090        // Apply pagination
1091        let paginated = paginate(instances, marker, max_records, |inst| {
1092            &inst.db_instance_identifier
1093        })?;
1094
1095        let marker_xml = paginated
1096            .next_marker
1097            .as_ref()
1098            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1099            .unwrap_or_default();
1100
1101        Ok(AwsResponse::xml(
1102            StatusCode::OK,
1103            xml_wrap(
1104                "DescribeDBInstances",
1105                &format!(
1106                    "<DBInstances>{}</DBInstances>{}",
1107                    paginated
1108                        .items
1109                        .iter()
1110                        .map(|instance| {
1111                            format!(
1112                                "<DBInstance>{}</DBInstance>",
1113                                db_instance_xml(instance, None)
1114                            )
1115                        })
1116                        .collect::<String>(),
1117                    marker_xml
1118                ),
1119                &request.request_id,
1120            ),
1121        ))
1122    }
1123
1124    fn describe_orderable_db_instance_options(
1125        &self,
1126        request: &AwsRequest,
1127    ) -> Result<AwsResponse, AwsServiceError> {
1128        let engine = optional_param(request, "Engine");
1129        let engine_version = optional_param(request, "EngineVersion");
1130        let db_instance_class = optional_param(request, "DBInstanceClass");
1131        let license_model = optional_param(request, "LicenseModel");
1132        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1133
1134        let options = filter_orderable_options(
1135            &default_orderable_options(),
1136            &engine,
1137            &engine_version,
1138            &db_instance_class,
1139            &license_model,
1140            vpc,
1141        );
1142
1143        Ok(AwsResponse::xml(
1144            StatusCode::OK,
1145            xml_wrap(
1146                "DescribeOrderableDBInstanceOptions",
1147                &format!(
1148                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1149                    options.iter().map(orderable_option_xml).collect::<String>()
1150                ),
1151                &request.request_id,
1152            ),
1153        ))
1154    }
1155
1156    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1157        let resource_name = required_param(request, "ResourceName")?;
1158        let tags = parse_tags(request)?;
1159
1160        if tags.is_empty() {
1161            return Err(AwsServiceError::aws_error(
1162                StatusCode::BAD_REQUEST,
1163                "MissingParameter",
1164                "The request must contain the parameter Tags.",
1165            ));
1166        }
1167
1168        let mut accounts = self.state.write();
1169        let state = accounts.get_or_create(&request.account_id);
1170        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1171        merge_tags(&mut instance.tags, &tags);
1172
1173        Ok(AwsResponse::xml(
1174            StatusCode::OK,
1175            xml_wrap("AddTagsToResource", "", &request.request_id),
1176        ))
1177    }
1178
1179    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1180        let resource_name = required_param(request, "ResourceName")?;
1181        if query_param_prefix_exists(request, "Filters.") {
1182            return Err(AwsServiceError::aws_error(
1183                StatusCode::BAD_REQUEST,
1184                "InvalidParameterValue",
1185                "Filters are not yet supported for ListTagsForResource.",
1186            ));
1187        }
1188
1189        let accounts = self.state.read();
1190        let empty = RdsState::new(&request.account_id, &request.region);
1191        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1192        let instance = find_instance_by_arn(state, &resource_name)?;
1193        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1194
1195        Ok(AwsResponse::xml(
1196            StatusCode::OK,
1197            xml_wrap(
1198                "ListTagsForResource",
1199                &format!("<TagList>{tag_xml}</TagList>"),
1200                &request.request_id,
1201            ),
1202        ))
1203    }
1204
1205    fn remove_tags_from_resource(
1206        &self,
1207        request: &AwsRequest,
1208    ) -> Result<AwsResponse, AwsServiceError> {
1209        let resource_name = required_param(request, "ResourceName")?;
1210        let tag_keys = parse_tag_keys(request)?;
1211
1212        if tag_keys.is_empty() {
1213            return Err(AwsServiceError::aws_error(
1214                StatusCode::BAD_REQUEST,
1215                "MissingParameter",
1216                "The request must contain the parameter TagKeys.",
1217            ));
1218        }
1219
1220        let mut accounts = self.state.write();
1221        let state = accounts.get_or_create(&request.account_id);
1222        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1223        instance
1224            .tags
1225            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1226
1227        Ok(AwsResponse::xml(
1228            StatusCode::OK,
1229            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
1230        ))
1231    }
1232
1233    async fn create_db_snapshot(
1234        &self,
1235        request: &AwsRequest,
1236    ) -> Result<AwsResponse, AwsServiceError> {
1237        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1238        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1239
1240        let runtime = self.runtime.as_ref().ok_or_else(|| {
1241            AwsServiceError::aws_error(
1242                StatusCode::SERVICE_UNAVAILABLE,
1243                "InvalidParameterValue",
1244                "Docker/Podman is required for RDS snapshots but is not available",
1245            )
1246        })?;
1247
1248        let (instance, db_name) = {
1249            let accounts = self.state.read();
1250            let empty = RdsState::new(&request.account_id, &request.region);
1251            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1252
1253            if state.snapshots.contains_key(&db_snapshot_identifier) {
1254                return Err(AwsServiceError::aws_error(
1255                    StatusCode::CONFLICT,
1256                    "DBSnapshotAlreadyExists",
1257                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1258                ));
1259            }
1260
1261            let instance = state
1262                .instances
1263                .get(&db_instance_identifier)
1264                .cloned()
1265                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1266
1267            let default_db = default_db_name(&instance.engine);
1268            let db_name = instance
1269                .db_name
1270                .as_deref()
1271                .unwrap_or(default_db)
1272                .to_string();
1273
1274            (instance, db_name)
1275        };
1276
1277        let dump_data = runtime
1278            .dump_database(
1279                &db_instance_identifier,
1280                &instance.engine,
1281                &instance.master_username,
1282                &instance.master_user_password,
1283                &db_name,
1284            )
1285            .await
1286            .map_err(runtime_error_to_service_error)?;
1287
1288        let mut accounts = self.state.write();
1289        let state = accounts.get_or_create(&request.account_id);
1290
1291        if state.snapshots.contains_key(&db_snapshot_identifier) {
1292            return Err(AwsServiceError::aws_error(
1293                StatusCode::CONFLICT,
1294                "DBSnapshotAlreadyExists",
1295                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1296            ));
1297        }
1298
1299        let snapshot = DbSnapshot {
1300            db_snapshot_identifier: db_snapshot_identifier.clone(),
1301            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1302            db_instance_identifier: instance.db_instance_identifier.clone(),
1303            snapshot_create_time: Utc::now(),
1304            engine: instance.engine.clone(),
1305            engine_version: instance.engine_version.clone(),
1306            allocated_storage: instance.allocated_storage,
1307            status: "available".to_string(),
1308            port: instance.port,
1309            master_username: instance.master_username.clone(),
1310            db_name: instance.db_name.clone(),
1311            dbi_resource_id: instance.dbi_resource_id.clone(),
1312            snapshot_type: "manual".to_string(),
1313            master_user_password: instance.master_user_password.clone(),
1314            tags: Vec::new(),
1315            dump_data,
1316        };
1317
1318        state
1319            .snapshots
1320            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1321        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1322        drop(accounts);
1323
1324        self.emit_event(
1325            RdsSourceType::DbSnapshot,
1326            &db_snapshot_identifier,
1327            &snapshot_arn,
1328            "RDS-EVENT-0042",
1329            &["creation"],
1330            "Manual snapshot created",
1331        );
1332
1333        Ok(AwsResponse::xml(
1334            StatusCode::OK,
1335            xml_wrap(
1336                "CreateDBSnapshot",
1337                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1338                &request.request_id,
1339            ),
1340        ))
1341    }
1342
1343    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1344        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1345        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1346        let marker = optional_param(request, "Marker");
1347        let max_records = optional_param(request, "MaxRecords");
1348
1349        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1350            return Err(AwsServiceError::aws_error(
1351                StatusCode::BAD_REQUEST,
1352                "InvalidParameterCombination",
1353                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1354            ));
1355        }
1356
1357        let accounts = self.state.read();
1358        let empty = RdsState::new(&request.account_id, &request.region);
1359        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1360
1361        // If specific snapshot requested, return just that one (no pagination)
1362        if let Some(snapshot_id) = db_snapshot_identifier {
1363            let snapshot = state
1364                .snapshots
1365                .get(&snapshot_id)
1366                .cloned()
1367                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1368
1369            return Ok(AwsResponse::xml(
1370                StatusCode::OK,
1371                xml_wrap(
1372                    "DescribeDBSnapshots",
1373                    &format!(
1374                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1375                        db_snapshot_xml(&snapshot)
1376                    ),
1377                    &request.request_id,
1378                ),
1379            ));
1380        }
1381
1382        // Get snapshots, filtered by instance identifier if provided
1383        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1384            state
1385                .snapshots
1386                .values()
1387                .filter(|s| s.db_instance_identifier == instance_id)
1388                .cloned()
1389                .collect()
1390        } else {
1391            state.snapshots.values().cloned().collect()
1392        };
1393
1394        // Sort by creation time, then identifier
1395        snapshots.sort_by(|a, b| {
1396            a.snapshot_create_time
1397                .cmp(&b.snapshot_create_time)
1398                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1399        });
1400
1401        // Apply pagination
1402        let paginated = paginate(snapshots, marker, max_records, |snap| {
1403            &snap.db_snapshot_identifier
1404        })?;
1405
1406        let marker_xml = paginated
1407            .next_marker
1408            .as_ref()
1409            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1410            .unwrap_or_default();
1411
1412        Ok(AwsResponse::xml(
1413            StatusCode::OK,
1414            xml_wrap(
1415                "DescribeDBSnapshots",
1416                &format!(
1417                    "<DBSnapshots>{}</DBSnapshots>{}",
1418                    paginated
1419                        .items
1420                        .iter()
1421                        .map(|snapshot| format!(
1422                            "<DBSnapshot>{}</DBSnapshot>",
1423                            db_snapshot_xml(snapshot)
1424                        ))
1425                        .collect::<String>(),
1426                    marker_xml
1427                ),
1428                &request.request_id,
1429            ),
1430        ))
1431    }
1432
1433    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1434        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1435
1436        let mut accounts = self.state.write();
1437        let state = accounts.get_or_create(&request.account_id);
1438
1439        let snapshot = state
1440            .snapshots
1441            .remove(&db_snapshot_identifier)
1442            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1443        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1444        drop(accounts);
1445
1446        self.emit_event(
1447            RdsSourceType::DbSnapshot,
1448            &db_snapshot_identifier,
1449            &snapshot_arn,
1450            "RDS-EVENT-0041",
1451            &["deletion"],
1452            "Manual snapshot deleted",
1453        );
1454
1455        Ok(AwsResponse::xml(
1456            StatusCode::OK,
1457            xml_wrap(
1458                "DeleteDBSnapshot",
1459                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1460                &request.request_id,
1461            ),
1462        ))
1463    }
1464
1465    async fn restore_db_instance_from_db_snapshot(
1466        &self,
1467        request: &AwsRequest,
1468    ) -> Result<AwsResponse, AwsServiceError> {
1469        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1470        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1471        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1472
1473        let runtime = self.require_runtime()?;
1474
1475        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1476            let mut accounts = self.state.write();
1477            let state = accounts.get_or_create(&request.account_id);
1478
1479            if !state.begin_instance_creation(&db_instance_identifier) {
1480                return Err(AwsServiceError::aws_error(
1481                    StatusCode::CONFLICT,
1482                    "DBInstanceAlreadyExists",
1483                    format!("DBInstance {db_instance_identifier} already exists."),
1484                ));
1485            }
1486
1487            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1488                Some(s) => s,
1489                None => {
1490                    state.cancel_instance_creation(&db_instance_identifier);
1491                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1492                }
1493            };
1494
1495            let dbi_resource_id = state.next_dbi_resource_id();
1496            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1497            let created_at = Utc::now();
1498
1499            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1500        };
1501
1502        let db_name = snapshot
1503            .db_name
1504            .as_deref()
1505            .unwrap_or(default_db_name(&snapshot.engine));
1506        let running = match runtime
1507            .ensure_postgres(
1508                &db_instance_identifier,
1509                &snapshot.engine,
1510                &snapshot.engine_version,
1511                &snapshot.master_username,
1512                &snapshot.master_user_password,
1513                db_name,
1514                &request.account_id,
1515                &request.region,
1516            )
1517            .await
1518        {
1519            Ok(running) => running,
1520            Err(e) => {
1521                self.state
1522                    .write()
1523                    .get_or_create(&request.account_id)
1524                    .cancel_instance_creation(&db_instance_identifier);
1525                return Err(runtime_error_to_service_error(e));
1526            }
1527        };
1528
1529        if let Err(e) = runtime
1530            .restore_database(
1531                &db_instance_identifier,
1532                &snapshot.engine,
1533                &snapshot.master_username,
1534                &snapshot.master_user_password,
1535                db_name,
1536                &snapshot.dump_data,
1537            )
1538            .await
1539        {
1540            self.state
1541                .write()
1542                .get_or_create(&request.account_id)
1543                .cancel_instance_creation(&db_instance_identifier);
1544            runtime.stop_container(&db_instance_identifier).await;
1545            return Err(runtime_error_to_service_error(e));
1546        }
1547
1548        let instance = build_restored_instance(
1549            &db_instance_identifier,
1550            db_instance_arn,
1551            dbi_resource_id,
1552            created_at,
1553            vpc_security_group_ids,
1554            &snapshot,
1555            &running,
1556        );
1557
1558        self.state
1559            .write()
1560            .get_or_create(&request.account_id)
1561            .finish_instance_creation(instance.clone());
1562
1563        self.emit_event(
1564            RdsSourceType::DbInstance,
1565            &db_instance_identifier,
1566            &instance.db_instance_arn,
1567            "RDS-EVENT-0043",
1568            &["creation"],
1569            "DB instance restored from snapshot",
1570        );
1571
1572        Ok(AwsResponse::xml(
1573            StatusCode::OK,
1574            xml_wrap(
1575                "RestoreDBInstanceFromDBSnapshot",
1576                &format!(
1577                    "<DBInstance>{}</DBInstance>",
1578                    db_instance_xml(&instance, None)
1579                ),
1580                &request.request_id,
1581            ),
1582        ))
1583    }
1584
1585    async fn create_db_instance_read_replica(
1586        &self,
1587        request: &AwsRequest,
1588    ) -> Result<AwsResponse, AwsServiceError> {
1589        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1590        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1591
1592        let runtime = self.runtime.as_ref().ok_or_else(|| {
1593            AwsServiceError::aws_error(
1594                StatusCode::SERVICE_UNAVAILABLE,
1595                "InvalidParameterValue",
1596                "Docker/Podman is required for RDS read replicas but is not available",
1597            )
1598        })?;
1599
1600        let (source_instance, db_name) = {
1601            let mut accounts = self.state.write();
1602            let state = accounts.get_or_create(&request.account_id);
1603
1604            if !state.begin_instance_creation(&db_instance_identifier) {
1605                return Err(AwsServiceError::aws_error(
1606                    StatusCode::CONFLICT,
1607                    "DBInstanceAlreadyExists",
1608                    format!("DBInstance {db_instance_identifier} already exists."),
1609                ));
1610            }
1611
1612            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1613            {
1614                Some(inst) => inst,
1615                None => {
1616                    state.cancel_instance_creation(&db_instance_identifier);
1617                    return Err(db_instance_not_found(&source_db_instance_identifier));
1618                }
1619            };
1620
1621            let default_db = default_db_name(&source_instance.engine);
1622            let db_name = source_instance
1623                .db_name
1624                .as_deref()
1625                .unwrap_or(default_db)
1626                .to_string();
1627
1628            (source_instance, db_name)
1629        };
1630
1631        let dump_data = match runtime
1632            .dump_database(
1633                &source_db_instance_identifier,
1634                &source_instance.engine,
1635                &source_instance.master_username,
1636                &source_instance.master_user_password,
1637                &db_name,
1638            )
1639            .await
1640        {
1641            Ok(data) => data,
1642            Err(e) => {
1643                self.state
1644                    .write()
1645                    .get_or_create(&request.account_id)
1646                    .cancel_instance_creation(&db_instance_identifier);
1647                return Err(runtime_error_to_service_error(e));
1648            }
1649        };
1650
1651        let (dbi_resource_id, db_instance_arn) = {
1652            let accounts = self.state.read();
1653            let empty = RdsState::new(&request.account_id, &request.region);
1654            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1655            (
1656                s.next_dbi_resource_id(),
1657                s.db_instance_arn(&db_instance_identifier),
1658            )
1659        };
1660        let created_at = Utc::now();
1661
1662        let running = match runtime
1663            .ensure_postgres(
1664                &db_instance_identifier,
1665                &source_instance.engine,
1666                &source_instance.engine_version,
1667                &source_instance.master_username,
1668                &source_instance.master_user_password,
1669                &db_name,
1670                &request.account_id,
1671                &request.region,
1672            )
1673            .await
1674        {
1675            Ok(running) => running,
1676            Err(e) => {
1677                self.state
1678                    .write()
1679                    .get_or_create(&request.account_id)
1680                    .cancel_instance_creation(&db_instance_identifier);
1681                return Err(runtime_error_to_service_error(e));
1682            }
1683        };
1684
1685        if let Err(e) = runtime
1686            .restore_database(
1687                &db_instance_identifier,
1688                &source_instance.engine,
1689                &source_instance.master_username,
1690                &source_instance.master_user_password,
1691                &db_name,
1692                &dump_data,
1693            )
1694            .await
1695        {
1696            self.state
1697                .write()
1698                .get_or_create(&request.account_id)
1699                .cancel_instance_creation(&db_instance_identifier);
1700            runtime.stop_container(&db_instance_identifier).await;
1701            return Err(runtime_error_to_service_error(e));
1702        }
1703
1704        let replica = build_read_replica_instance(
1705            &db_instance_identifier,
1706            db_instance_arn,
1707            dbi_resource_id,
1708            created_at,
1709            &source_db_instance_identifier,
1710            &source_instance,
1711            &running,
1712        );
1713
1714        let source_missing = {
1715            let mut accounts = self.state.write();
1716            let state = accounts.get_or_create(&request.account_id);
1717            match state.instances.get_mut(&source_db_instance_identifier) {
1718                Some(source) => {
1719                    source
1720                        .read_replica_db_instance_identifiers
1721                        .push(db_instance_identifier.clone());
1722                    state.finish_instance_creation(replica.clone());
1723                    false
1724                }
1725                None => {
1726                    state.cancel_instance_creation(&db_instance_identifier);
1727                    true
1728                }
1729            }
1730        };
1731
1732        if source_missing {
1733            runtime.stop_container(&db_instance_identifier).await;
1734            return Err(db_instance_not_found(&source_db_instance_identifier));
1735        }
1736
1737        self.emit_event(
1738            RdsSourceType::DbInstance,
1739            &db_instance_identifier,
1740            &replica.db_instance_arn,
1741            "RDS-EVENT-0005",
1742            &["creation", "read replica"],
1743            "Read replica DB instance created",
1744        );
1745
1746        Ok(AwsResponse::xml(
1747            StatusCode::OK,
1748            xml_wrap(
1749                "CreateDBInstanceReadReplica",
1750                &format!(
1751                    "<DBInstance>{}</DBInstance>",
1752                    db_instance_xml(&replica, None)
1753                ),
1754                &request.request_id,
1755            ),
1756        ))
1757    }
1758
1759    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1760        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1761        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1762        let subnet_ids = parse_subnet_ids(request)?;
1763
1764        if subnet_ids.is_empty() {
1765            return Err(AwsServiceError::aws_error(
1766                StatusCode::BAD_REQUEST,
1767                "InvalidParameterValue",
1768                "At least one subnet must be specified.",
1769            ));
1770        }
1771
1772        if subnet_ids.len() < 2 {
1773            return Err(AwsServiceError::aws_error(
1774                StatusCode::BAD_REQUEST,
1775                "DBSubnetGroupDoesNotCoverEnoughAZs",
1776                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1777            ));
1778        }
1779
1780        let mut accounts = self.state.write();
1781        let state = accounts.get_or_create(&request.account_id);
1782
1783        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1784            return Err(AwsServiceError::aws_error(
1785                StatusCode::CONFLICT,
1786                "DBSubnetGroupAlreadyExists",
1787                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1788            ));
1789        }
1790
1791        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1792        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1793            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1794            .collect();
1795
1796        // Validate that subnets span at least 2 unique Availability Zones
1797        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1798        if unique_azs.len() < 2 {
1799            return Err(AwsServiceError::aws_error(
1800                StatusCode::BAD_REQUEST,
1801                "DBSubnetGroupDoesNotCoverEnoughAZs",
1802                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1803            ));
1804        }
1805
1806        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1807        let tags = parse_tags(request)?;
1808
1809        let subnet_group = DbSubnetGroup {
1810            db_subnet_group_name: db_subnet_group_name.clone(),
1811            db_subnet_group_arn,
1812            db_subnet_group_description,
1813            vpc_id,
1814            subnet_ids,
1815            subnet_availability_zones,
1816            tags,
1817        };
1818
1819        state
1820            .subnet_groups
1821            .insert(db_subnet_group_name, subnet_group.clone());
1822
1823        Ok(AwsResponse::xml(
1824            StatusCode::OK,
1825            xml_wrap(
1826                "CreateDBSubnetGroup",
1827                &format!(
1828                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1829                    db_subnet_group_xml(&subnet_group)
1830                ),
1831                &request.request_id,
1832            ),
1833        ))
1834    }
1835
1836    fn describe_db_subnet_groups(
1837        &self,
1838        request: &AwsRequest,
1839    ) -> Result<AwsResponse, AwsServiceError> {
1840        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1841        let marker = optional_param(request, "Marker");
1842        let max_records = optional_param(request, "MaxRecords");
1843
1844        let accounts = self.state.read();
1845        let empty = RdsState::new(&request.account_id, &request.region);
1846        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1847
1848        // If specific subnet group requested, return just that one (no pagination)
1849        if let Some(name) = db_subnet_group_name {
1850            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1851                AwsServiceError::aws_error(
1852                    StatusCode::NOT_FOUND,
1853                    "DBSubnetGroupNotFoundFault",
1854                    format!("DBSubnetGroup {} not found.", name),
1855                )
1856            })?;
1857
1858            return Ok(AwsResponse::xml(
1859                StatusCode::OK,
1860                xml_wrap(
1861                    "DescribeDBSubnetGroups",
1862                    &format!(
1863                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1864                        db_subnet_group_xml(sg)
1865                    ),
1866                    &request.request_id,
1867                ),
1868            ));
1869        }
1870
1871        // Get all subnet groups sorted by name
1872        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1873        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1874
1875        // Apply pagination
1876        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1877            &sg.db_subnet_group_name
1878        })?;
1879
1880        let marker_xml = paginated
1881            .next_marker
1882            .as_ref()
1883            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1884            .unwrap_or_default();
1885
1886        let body = paginated
1887            .items
1888            .iter()
1889            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1890            .collect::<Vec<_>>()
1891            .join("");
1892
1893        Ok(AwsResponse::xml(
1894            StatusCode::OK,
1895            xml_wrap(
1896                "DescribeDBSubnetGroups",
1897                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1898                &request.request_id,
1899            ),
1900        ))
1901    }
1902
1903    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1904        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1905
1906        let mut accounts = self.state.write();
1907        let state = accounts.get_or_create(&request.account_id);
1908
1909        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1910            return Err(AwsServiceError::aws_error(
1911                StatusCode::NOT_FOUND,
1912                "DBSubnetGroupNotFoundFault",
1913                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1914            ));
1915        }
1916
1917        Ok(AwsResponse::xml(
1918            StatusCode::OK,
1919            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1920        ))
1921    }
1922
1923    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1924        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1925        let subnet_ids = parse_subnet_ids(request)?;
1926
1927        if subnet_ids.is_empty() {
1928            return Err(AwsServiceError::aws_error(
1929                StatusCode::BAD_REQUEST,
1930                "InvalidParameterValue",
1931                "At least one subnet must be specified.",
1932            ));
1933        }
1934
1935        if subnet_ids.len() < 2 {
1936            return Err(AwsServiceError::aws_error(
1937                StatusCode::BAD_REQUEST,
1938                "DBSubnetGroupDoesNotCoverEnoughAZs",
1939                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1940            ));
1941        }
1942
1943        let mut accounts = self.state.write();
1944        let state = accounts.get_or_create(&request.account_id);
1945
1946        let region = state.region.clone();
1947
1948        let subnet_group = state
1949            .subnet_groups
1950            .get_mut(&db_subnet_group_name)
1951            .ok_or_else(|| {
1952                AwsServiceError::aws_error(
1953                    StatusCode::NOT_FOUND,
1954                    "DBSubnetGroupNotFoundFault",
1955                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1956                )
1957            })?;
1958
1959        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1960            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1961            .collect();
1962
1963        // Validate that subnets span at least 2 unique Availability Zones
1964        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1965        if unique_azs.len() < 2 {
1966            return Err(AwsServiceError::aws_error(
1967                StatusCode::BAD_REQUEST,
1968                "DBSubnetGroupDoesNotCoverEnoughAZs",
1969                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1970            ));
1971        }
1972
1973        subnet_group.subnet_ids = subnet_ids;
1974        subnet_group.subnet_availability_zones = subnet_availability_zones;
1975
1976        let subnet_group_clone = subnet_group.clone();
1977
1978        Ok(AwsResponse::xml(
1979            StatusCode::OK,
1980            xml_wrap(
1981                "ModifyDBSubnetGroup",
1982                &format!(
1983                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1984                    db_subnet_group_xml(&subnet_group_clone)
1985                ),
1986                &request.request_id,
1987            ),
1988        ))
1989    }
1990
1991    fn create_db_parameter_group(
1992        &self,
1993        request: &AwsRequest,
1994    ) -> Result<AwsResponse, AwsServiceError> {
1995        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1996        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1997        let description = required_param(request, "Description")?;
1998
1999        // Validate parameter group family against supported engines and versions
2000        let valid_families = [
2001            "postgres16",
2002            "postgres15",
2003            "postgres14",
2004            "postgres13",
2005            "mysql8.0",
2006            "mysql5.7",
2007            "mariadb10.11",
2008            "mariadb10.6",
2009        ];
2010
2011        if !valid_families.contains(&db_parameter_group_family.as_str()) {
2012            return Err(AwsServiceError::aws_error(
2013                StatusCode::BAD_REQUEST,
2014                "InvalidParameterValue",
2015                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
2016            ));
2017        }
2018
2019        let mut accounts = self.state.write();
2020        let state = accounts.get_or_create(&request.account_id);
2021
2022        if state
2023            .parameter_groups
2024            .contains_key(&db_parameter_group_name)
2025        {
2026            return Err(AwsServiceError::aws_error(
2027                StatusCode::CONFLICT,
2028                "DBParameterGroupAlreadyExists",
2029                format!("DBParameterGroup {db_parameter_group_name} already exists."),
2030            ));
2031        }
2032
2033        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
2034        let tags = parse_tags(request)?;
2035
2036        let parameter_group = DbParameterGroup {
2037            db_parameter_group_name: db_parameter_group_name.clone(),
2038            db_parameter_group_arn,
2039            db_parameter_group_family,
2040            description,
2041            parameters: std::collections::HashMap::new(),
2042            tags,
2043        };
2044
2045        state
2046            .parameter_groups
2047            .insert(db_parameter_group_name, parameter_group.clone());
2048
2049        Ok(AwsResponse::xml(
2050            StatusCode::OK,
2051            xml_wrap(
2052                "CreateDBParameterGroup",
2053                &format!(
2054                    "<DBParameterGroup>{}</DBParameterGroup>",
2055                    db_parameter_group_xml(&parameter_group)
2056                ),
2057                &request.request_id,
2058            ),
2059        ))
2060    }
2061
2062    fn describe_db_parameter_groups(
2063        &self,
2064        request: &AwsRequest,
2065    ) -> Result<AwsResponse, AwsServiceError> {
2066        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2067        let marker = optional_param(request, "Marker");
2068        let max_records = optional_param(request, "MaxRecords");
2069
2070        let accounts = self.state.read();
2071        let empty = RdsState::new(&request.account_id, &request.region);
2072        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2073
2074        // If specific parameter group requested, return just that one (no pagination)
2075        if let Some(name) = db_parameter_group_name {
2076            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2077                AwsServiceError::aws_error(
2078                    StatusCode::NOT_FOUND,
2079                    "DBParameterGroupNotFound",
2080                    format!("DBParameterGroup {} not found.", name),
2081                )
2082            })?;
2083
2084            return Ok(AwsResponse::xml(
2085                StatusCode::OK,
2086                xml_wrap(
2087                    "DescribeDBParameterGroups",
2088                    &format!(
2089                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2090                        db_parameter_group_xml(pg)
2091                    ),
2092                    &request.request_id,
2093                ),
2094            ));
2095        }
2096
2097        // Get all parameter groups sorted by name
2098        let mut parameter_groups: Vec<DbParameterGroup> =
2099            state.parameter_groups.values().cloned().collect();
2100        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2101
2102        // Apply pagination
2103        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2104            &pg.db_parameter_group_name
2105        })?;
2106
2107        let marker_xml = paginated
2108            .next_marker
2109            .as_ref()
2110            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2111            .unwrap_or_default();
2112
2113        let body = paginated
2114            .items
2115            .iter()
2116            .map(|pg| {
2117                format!(
2118                    "<DBParameterGroup>{}</DBParameterGroup>",
2119                    db_parameter_group_xml(pg)
2120                )
2121            })
2122            .collect::<Vec<_>>()
2123            .join("");
2124
2125        Ok(AwsResponse::xml(
2126            StatusCode::OK,
2127            xml_wrap(
2128                "DescribeDBParameterGroups",
2129                &format!(
2130                    "<DBParameterGroups>{}</DBParameterGroups>{}",
2131                    body, marker_xml
2132                ),
2133                &request.request_id,
2134            ),
2135        ))
2136    }
2137
2138    fn delete_db_parameter_group(
2139        &self,
2140        request: &AwsRequest,
2141    ) -> Result<AwsResponse, AwsServiceError> {
2142        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2143
2144        let mut accounts = self.state.write();
2145        let state = accounts.get_or_create(&request.account_id);
2146
2147        if db_parameter_group_name.starts_with("default.") {
2148            return Err(AwsServiceError::aws_error(
2149                StatusCode::BAD_REQUEST,
2150                "InvalidParameterValue",
2151                "Cannot delete default parameter groups.",
2152            ));
2153        }
2154
2155        if state
2156            .parameter_groups
2157            .remove(&db_parameter_group_name)
2158            .is_none()
2159        {
2160            return Err(AwsServiceError::aws_error(
2161                StatusCode::NOT_FOUND,
2162                "DBParameterGroupNotFound",
2163                format!("DBParameterGroup {db_parameter_group_name} not found."),
2164            ));
2165        }
2166
2167        Ok(AwsResponse::xml(
2168            StatusCode::OK,
2169            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
2170        ))
2171    }
2172
2173    fn modify_db_parameter_group(
2174        &self,
2175        request: &AwsRequest,
2176    ) -> Result<AwsResponse, AwsServiceError> {
2177        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2178
2179        let mut accounts = self.state.write();
2180        let state = accounts.get_or_create(&request.account_id);
2181
2182        let parameter_group = state
2183            .parameter_groups
2184            .get_mut(&db_parameter_group_name)
2185            .ok_or_else(|| {
2186                AwsServiceError::aws_error(
2187                    StatusCode::NOT_FOUND,
2188                    "DBParameterGroupNotFound",
2189                    format!("DBParameterGroup {db_parameter_group_name} not found."),
2190                )
2191            })?;
2192
2193        if let Some(new_description) = optional_param(request, "Description") {
2194            parameter_group.description = new_description;
2195        }
2196
2197        let parameter_group_clone = parameter_group.clone();
2198
2199        Ok(AwsResponse::xml(
2200            StatusCode::OK,
2201            xml_wrap(
2202                "ModifyDBParameterGroup",
2203                &format!(
2204                    "<DBParameterGroupName>{}</DBParameterGroupName>",
2205                    xml_escape(&parameter_group_clone.db_parameter_group_name)
2206                ),
2207                &request.request_id,
2208            ),
2209        ))
2210    }
2211}
2212
2213fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2214    fakecloud_core::query::optional_query_param(req, name)
2215}
2216
2217fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2218    fakecloud_core::query::required_query_param(req, name)
2219}
2220
2221fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2222    let value = required_param(req, name)?;
2223    value.parse::<i32>().map_err(|_| {
2224        AwsServiceError::aws_error(
2225            StatusCode::BAD_REQUEST,
2226            "InvalidParameterValue",
2227            format!("Parameter {name} must be a valid integer."),
2228        )
2229    })
2230}
2231
2232fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2233    optional_param(req, name)
2234        .map(|value| {
2235            value.parse::<i32>().map_err(|_| {
2236                AwsServiceError::aws_error(
2237                    StatusCode::BAD_REQUEST,
2238                    "InvalidParameterValue",
2239                    format!("Parameter {name} must be a valid integer."),
2240                )
2241            })
2242        })
2243        .transpose()
2244}
2245
2246fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2247    let mut tags = Vec::new();
2248    for index in 1.. {
2249        let key_name = format!("Tags.Tag.{index}.Key");
2250        let value_name = format!("Tags.Tag.{index}.Value");
2251        let key = optional_param(req, &key_name);
2252        let value = optional_param(req, &value_name);
2253
2254        match (key, value) {
2255            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2256            (None, None) => break,
2257            _ => {
2258                return Err(AwsServiceError::aws_error(
2259                    StatusCode::BAD_REQUEST,
2260                    "InvalidParameterValue",
2261                    "Each tag must include both Key and Value.",
2262                ));
2263            }
2264        }
2265    }
2266
2267    Ok(tags)
2268}
2269
2270fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2271    let mut keys = Vec::new();
2272    for index in 1.. {
2273        let key_name = format!("TagKeys.member.{index}");
2274        match optional_param(req, &key_name) {
2275            Some(key) => keys.push(key),
2276            None => break,
2277        }
2278    }
2279
2280    Ok(keys)
2281}
2282
2283fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2284    let mut subnet_ids = Vec::new();
2285    for index in 1.. {
2286        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2287        match optional_param(req, &subnet_id_name) {
2288            Some(subnet_id) => subnet_ids.push(subnet_id),
2289            None => break,
2290        }
2291    }
2292
2293    Ok(subnet_ids)
2294}
2295
2296fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2297    let mut security_group_ids = Vec::new();
2298    for index in 1.. {
2299        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2300        match optional_param(req, &sg_id_name) {
2301            Some(sg_id) => security_group_ids.push(sg_id),
2302            None => break,
2303        }
2304    }
2305
2306    // If no security groups provided, return a default one
2307    if security_group_ids.is_empty() {
2308        security_group_ids.push("sg-default".to_string());
2309    }
2310
2311    security_group_ids
2312}
2313
2314fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2315    req.query_params.keys().any(|key| key.starts_with(prefix))
2316}
2317
2318fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2319    value
2320        .map(|raw| match raw {
2321            "true" | "True" | "TRUE" => Ok(true),
2322            "false" | "False" | "FALSE" => Ok(false),
2323            _ => Err(AwsServiceError::aws_error(
2324                StatusCode::BAD_REQUEST,
2325                "InvalidParameterValue",
2326                format!("Boolean parameter value '{raw}' is invalid."),
2327            )),
2328        })
2329        .transpose()
2330}
2331
2332struct PaginationResult<T> {
2333    items: Vec<T>,
2334    next_marker: Option<String>,
2335}
2336
2337fn paginate<T, F>(
2338    mut items: Vec<T>,
2339    marker: Option<String>,
2340    max_records: Option<String>,
2341    get_id: F,
2342) -> Result<PaginationResult<T>, AwsServiceError>
2343where
2344    F: Fn(&T) -> &str,
2345{
2346    // Parse max_records with default 100, max 100
2347    let max = if let Some(max_str) = max_records {
2348        let parsed = max_str.parse::<i32>().map_err(|_| {
2349            AwsServiceError::aws_error(
2350                StatusCode::BAD_REQUEST,
2351                "InvalidParameterValue",
2352                "MaxRecords must be a valid integer.",
2353            )
2354        })?;
2355        if !(1..=100).contains(&parsed) {
2356            return Err(AwsServiceError::aws_error(
2357                StatusCode::BAD_REQUEST,
2358                "InvalidParameterValue",
2359                "MaxRecords must be between 1 and 100.",
2360            ));
2361        }
2362        parsed as usize
2363    } else {
2364        100
2365    };
2366
2367    // Decode marker to get starting identifier
2368    let start_id = if let Some(encoded_marker) = marker {
2369        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2370            AwsServiceError::aws_error(
2371                StatusCode::BAD_REQUEST,
2372                "InvalidParameterValue",
2373                "Marker is invalid.",
2374            )
2375        })?;
2376        let id = String::from_utf8(decoded).map_err(|_| {
2377            AwsServiceError::aws_error(
2378                StatusCode::BAD_REQUEST,
2379                "InvalidParameterValue",
2380                "Marker is invalid.",
2381            )
2382        })?;
2383        Some(id)
2384    } else {
2385        None
2386    };
2387
2388    // Find starting position
2389    let start_index = if let Some(ref start_id) = start_id {
2390        items
2391            .iter()
2392            .position(|item| get_id(item) == start_id)
2393            .map(|pos| pos + 1) // Start after the marker
2394            .unwrap_or(items.len()) // If not found, return empty result
2395    } else {
2396        0
2397    };
2398
2399    // Take items from start_index
2400    let total_items = items.len();
2401    let end_index = std::cmp::min(start_index + max, total_items);
2402    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2403
2404    // Create next marker if there are more items
2405    let next_marker = if end_index < total_items {
2406        paginated_items
2407            .last()
2408            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2409    } else {
2410        None
2411    };
2412
2413    Ok(PaginationResult {
2414        items: paginated_items,
2415        next_marker,
2416    })
2417}
2418
2419fn validate_create_request(
2420    db_instance_identifier: &str,
2421    allocated_storage: i32,
2422    db_instance_class: &str,
2423    engine: &str,
2424    engine_version: &str,
2425    port: i32,
2426) -> Result<(), AwsServiceError> {
2427    if allocated_storage <= 0 {
2428        return Err(AwsServiceError::aws_error(
2429            StatusCode::BAD_REQUEST,
2430            "InvalidParameterValue",
2431            "AllocatedStorage must be greater than zero.",
2432        ));
2433    }
2434    if port <= 0 {
2435        return Err(AwsServiceError::aws_error(
2436            StatusCode::BAD_REQUEST,
2437            "InvalidParameterValue",
2438            "Port must be greater than zero.",
2439        ));
2440    }
2441    if !db_instance_identifier
2442        .chars()
2443        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2444    {
2445        return Err(AwsServiceError::aws_error(
2446            StatusCode::BAD_REQUEST,
2447            "InvalidParameterValue",
2448            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2449        ));
2450    }
2451    // Validate engine
2452    let supported_engines = [
2453        "postgres",
2454        "mysql",
2455        "mariadb",
2456        "oracle-ee",
2457        "oracle-se2",
2458        "oracle-ee-cdb",
2459        "oracle-se2-cdb",
2460        "sqlserver-ee",
2461        "sqlserver-se",
2462        "sqlserver-ex",
2463        "sqlserver-web",
2464        "db2-se",
2465        "db2-ae",
2466    ];
2467    if !supported_engines.contains(&engine) {
2468        return Err(AwsServiceError::aws_error(
2469            StatusCode::BAD_REQUEST,
2470            "InvalidParameterValue",
2471            format!("Engine '{}' is not supported.", engine),
2472        ));
2473    }
2474
2475    // Validate engine version. The Oracle/SQL Server/Db2 lists track
2476    // the major-minor versions actually shipped by the upstream
2477    // dev-edition images (gvenzl/oracle-free 23, mssql-server 2022,
2478    // db2_community 11.5). Adding a new version here also requires
2479    // wiring the image tag in `RdsRuntime::ensure_postgres`.
2480    // Major versions ("8.0", "10.11", ...) are accepted alongside the
2481    // full `<major>.<minor>.<patch>` triplets — AWS RDS validates both
2482    // forms and the runtime resolves the matching prebuilt image regardless.
2483    let supported_versions = match engine {
2484        "postgres" => vec!["16", "15", "14", "13", "16.3", "15.5", "14.10", "13.13"],
2485        "mysql" => vec!["8.0", "8.0.35", "8.0.28", "5.7.44"],
2486        "mariadb" => vec!["10.6", "10.11", "11.4", "11.4.5", "10.11.6", "10.6.16"],
2487        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2488            vec!["23.0.0", "21.0.0", "19.0.0"]
2489        }
2490        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2491            vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2492        }
2493        "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2494        _ => vec![],
2495    };
2496
2497    if !supported_versions.contains(&engine_version) {
2498        return Err(AwsServiceError::aws_error(
2499            StatusCode::BAD_REQUEST,
2500            "InvalidParameterValue",
2501            format!("EngineVersion '{engine_version}' is not supported yet."),
2502        ));
2503    }
2504    validate_db_instance_class(db_instance_class)?;
2505    Ok(())
2506}
2507
2508fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2509    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2510        return Err(AwsServiceError::aws_error(
2511            StatusCode::BAD_REQUEST,
2512            "InvalidParameterValue",
2513            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2514        ));
2515    }
2516    Ok(())
2517}
2518
2519fn filter_engine_versions(
2520    versions: &[EngineVersionInfo],
2521    engine: &Option<String>,
2522    engine_version: &Option<String>,
2523    family: &Option<String>,
2524) -> Vec<EngineVersionInfo> {
2525    versions
2526        .iter()
2527        .filter(|candidate| {
2528            engine
2529                .as_ref()
2530                .is_none_or(|expected| candidate.engine == *expected)
2531        })
2532        .filter(|candidate| {
2533            engine_version
2534                .as_ref()
2535                .is_none_or(|expected| candidate.engine_version == *expected)
2536        })
2537        .filter(|candidate| {
2538            family
2539                .as_ref()
2540                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2541        })
2542        .cloned()
2543        .collect()
2544}
2545
2546fn filter_orderable_options(
2547    options: &[OrderableDbInstanceOption],
2548    engine: &Option<String>,
2549    engine_version: &Option<String>,
2550    db_instance_class: &Option<String>,
2551    license_model: &Option<String>,
2552    vpc: Option<bool>,
2553) -> Vec<OrderableDbInstanceOption> {
2554    options
2555        .iter()
2556        .filter(|candidate| {
2557            engine
2558                .as_ref()
2559                .is_none_or(|expected| candidate.engine == *expected)
2560        })
2561        .filter(|candidate| {
2562            engine_version
2563                .as_ref()
2564                .is_none_or(|expected| candidate.engine_version == *expected)
2565        })
2566        .filter(|candidate| {
2567            db_instance_class
2568                .as_ref()
2569                .is_none_or(|expected| candidate.db_instance_class == *expected)
2570        })
2571        .filter(|candidate| {
2572            license_model
2573                .as_ref()
2574                .is_none_or(|expected| candidate.license_model == *expected)
2575        })
2576        .filter(|_| vpc.unwrap_or(true))
2577        .cloned()
2578        .collect()
2579}
2580
2581/// Build a `DbInstance` for a newly-created read replica, copying the
2582/// source instance's physical attributes and binding the replica's
2583/// identifier, ARN, resource id, container id and host port.
2584#[allow(clippy::too_many_arguments)]
2585/// Build a `DbInstance` from a restored snapshot. Copies the physical
2586/// attributes off the snapshot and binds the new instance's identifier,
2587/// ARN, resource id, container id and host port.
2588fn build_restored_instance(
2589    db_instance_identifier: &str,
2590    db_instance_arn: String,
2591    dbi_resource_id: String,
2592    created_at: chrono::DateTime<Utc>,
2593    vpc_security_group_ids: Vec<String>,
2594    snapshot: &DbSnapshot,
2595    running: &crate::runtime::RunningDbContainer,
2596) -> DbInstance {
2597    DbInstance {
2598        db_instance_identifier: db_instance_identifier.to_string(),
2599        db_instance_arn,
2600        db_instance_class: "db.t3.micro".to_string(),
2601        engine: snapshot.engine.clone(),
2602        engine_version: snapshot.engine_version.clone(),
2603        db_instance_status: "available".to_string(),
2604        master_username: snapshot.master_username.clone(),
2605        db_name: snapshot.db_name.clone(),
2606        endpoint_address: "127.0.0.1".to_string(),
2607        port: i32::from(running.host_port),
2608        allocated_storage: snapshot.allocated_storage,
2609        publicly_accessible: true,
2610        deletion_protection: false,
2611        created_at,
2612        dbi_resource_id,
2613        master_user_password: snapshot.master_user_password.clone(),
2614        container_id: running.container_id.clone(),
2615        host_port: running.host_port,
2616        tags: Vec::new(),
2617        read_replica_source_db_instance_identifier: None,
2618        read_replica_db_instance_identifiers: Vec::new(),
2619        vpc_security_group_ids,
2620        db_parameter_group_name: None,
2621        backup_retention_period: 1,
2622        preferred_backup_window: "03:00-04:00".to_string(),
2623        latest_restorable_time: Some(created_at),
2624        option_group_name: None,
2625        multi_az: false,
2626        pending_modified_values: None,
2627    }
2628}
2629
2630fn build_read_replica_instance(
2631    db_instance_identifier: &str,
2632    db_instance_arn: String,
2633    dbi_resource_id: String,
2634    created_at: chrono::DateTime<Utc>,
2635    source_db_instance_identifier: &str,
2636    source: &DbInstance,
2637    running: &crate::runtime::RunningDbContainer,
2638) -> DbInstance {
2639    DbInstance {
2640        db_instance_identifier: db_instance_identifier.to_string(),
2641        db_instance_arn,
2642        db_instance_class: source.db_instance_class.clone(),
2643        engine: source.engine.clone(),
2644        engine_version: source.engine_version.clone(),
2645        db_instance_status: "available".to_string(),
2646        master_username: source.master_username.clone(),
2647        db_name: source.db_name.clone(),
2648        endpoint_address: "127.0.0.1".to_string(),
2649        port: i32::from(running.host_port),
2650        allocated_storage: source.allocated_storage,
2651        publicly_accessible: source.publicly_accessible,
2652        deletion_protection: false,
2653        created_at,
2654        dbi_resource_id,
2655        master_user_password: source.master_user_password.clone(),
2656        container_id: running.container_id.clone(),
2657        host_port: running.host_port,
2658        tags: Vec::new(),
2659        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2660        read_replica_db_instance_identifiers: Vec::new(),
2661        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2662        db_parameter_group_name: source.db_parameter_group_name.clone(),
2663        backup_retention_period: source.backup_retention_period,
2664        preferred_backup_window: source.preferred_backup_window.clone(),
2665        latest_restorable_time: if source.backup_retention_period > 0 {
2666            Some(created_at)
2667        } else {
2668            None
2669        },
2670        option_group_name: source.option_group_name.clone(),
2671        multi_az: source.multi_az,
2672        pending_modified_values: None,
2673    }
2674}
2675
2676fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2677    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2678}
2679
2680fn engine_version_xml(version: &EngineVersionInfo) -> String {
2681    format!(
2682        "<DBEngineVersion>\
2683         <Engine>{}</Engine>\
2684         <EngineVersion>{}</EngineVersion>\
2685         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2686         <DBEngineDescription>{}</DBEngineDescription>\
2687         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2688         <Status>{}</Status>\
2689         </DBEngineVersion>",
2690        xml_escape(&version.engine),
2691        xml_escape(&version.engine_version),
2692        xml_escape(&version.db_parameter_group_family),
2693        xml_escape(&version.db_engine_description),
2694        xml_escape(&version.db_engine_version_description),
2695        xml_escape(&version.status),
2696    )
2697}
2698
2699fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2700    format!(
2701        "<OrderableDBInstanceOption>\
2702         <Engine>{}</Engine>\
2703         <EngineVersion>{}</EngineVersion>\
2704         <DBInstanceClass>{}</DBInstanceClass>\
2705         <LicenseModel>{}</LicenseModel>\
2706         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2707         <MultiAZCapable>true</MultiAZCapable>\
2708         <ReadReplicaCapable>true</ReadReplicaCapable>\
2709         <Vpc>true</Vpc>\
2710         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2711         <StorageType>{}</StorageType>\
2712         <SupportsIops>false</SupportsIops>\
2713         <MinStorageSize>{}</MinStorageSize>\
2714         <MaxStorageSize>{}</MaxStorageSize>\
2715         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2716         </OrderableDBInstanceOption>",
2717        xml_escape(&option.engine),
2718        xml_escape(&option.engine_version),
2719        xml_escape(&option.db_instance_class),
2720        xml_escape(&option.license_model),
2721        xml_escape(&option.storage_type),
2722        option.min_storage_size,
2723        option.max_storage_size,
2724    )
2725}
2726
2727fn tag_xml(tag: &RdsTag) -> String {
2728    format!(
2729        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2730        xml_escape(&tag.key),
2731        xml_escape(&tag.value),
2732    )
2733}
2734
2735/// Free-standing version of `emit_event` so background tasks (which
2736/// don't have a `&self`) can publish RDS events through the same path.
2737pub(crate) fn emit_event_static(
2738    delivery_bus: Option<&Arc<DeliveryBus>>,
2739    source_type: RdsSourceType,
2740    source_identifier: &str,
2741    source_arn: &str,
2742    event_id: &str,
2743    event_categories: &[&str],
2744    message: &str,
2745) {
2746    let Some(bus) = delivery_bus else {
2747        return;
2748    };
2749    let detail = serde_json::json!({
2750        "EventCategories": event_categories,
2751        "SourceType": source_type.as_str(),
2752        "SourceArn": source_arn,
2753        "Date": Utc::now().to_rfc3339(),
2754        "Message": message,
2755        "SourceIdentifier": source_identifier,
2756        "EventID": event_id,
2757    });
2758    bus.put_event_to_eventbridge(
2759        "aws.rds",
2760        source_type.detail_type(),
2761        &detail.to_string(),
2762        "default",
2763    );
2764}
2765
2766fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2767    let status = status_override.unwrap_or(&instance.db_instance_status);
2768    let db_name_xml = instance
2769        .db_name
2770        .as_ref()
2771        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2772        .unwrap_or_default();
2773
2774    let read_replica_source_xml = instance
2775        .read_replica_source_db_instance_identifier
2776        .as_ref()
2777        .map(|source| {
2778            format!(
2779                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2780                xml_escape(source)
2781            )
2782        })
2783        .unwrap_or_default();
2784
2785    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2786        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2787    } else {
2788        format!(
2789            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2790            instance
2791                .read_replica_db_instance_identifiers
2792                .iter()
2793                .map(|id| format!(
2794                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2795                    xml_escape(id)
2796                ))
2797                .collect::<String>()
2798        )
2799    };
2800
2801    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2802        "<VpcSecurityGroups/>".to_string()
2803    } else {
2804        format!(
2805            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2806            instance
2807                .vpc_security_group_ids
2808                .iter()
2809                .map(|sg_id| format!(
2810                    "<VpcSecurityGroupMembership>\
2811                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2812                     <Status>active</Status>\
2813                     </VpcSecurityGroupMembership>",
2814                    xml_escape(sg_id)
2815                ))
2816                .collect::<String>()
2817        )
2818    };
2819
2820    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2821        Some(pg_name) => format!(
2822            "<DBParameterGroups>\
2823             <DBParameterGroup>\
2824             <DBParameterGroupName>{}</DBParameterGroupName>\
2825             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2826             </DBParameterGroup>\
2827             </DBParameterGroups>",
2828            xml_escape(pg_name)
2829        ),
2830        None => "<DBParameterGroups/>".to_string(),
2831    };
2832
2833    let option_group_memberships_xml = match &instance.option_group_name {
2834        Some(og_name) => format!(
2835            "<OptionGroupMemberships>\
2836             <OptionGroupMembership>\
2837             <OptionGroupName>{}</OptionGroupName>\
2838             <Status>in-sync</Status>\
2839             </OptionGroupMembership>\
2840             </OptionGroupMemberships>",
2841            xml_escape(og_name)
2842        ),
2843        None => "<OptionGroupMemberships/>".to_string(),
2844    };
2845
2846    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2847        let mut fields = Vec::new();
2848        if let Some(ref class) = pending.db_instance_class {
2849            fields.push(format!(
2850                "<DBInstanceClass>{}</DBInstanceClass>",
2851                xml_escape(class)
2852            ));
2853        }
2854        if let Some(allocated_storage) = pending.allocated_storage {
2855            fields.push(format!(
2856                "<AllocatedStorage>{}</AllocatedStorage>",
2857                allocated_storage
2858            ));
2859        }
2860        if let Some(backup_retention_period) = pending.backup_retention_period {
2861            fields.push(format!(
2862                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2863                backup_retention_period
2864            ));
2865        }
2866        if let Some(multi_az) = pending.multi_az {
2867            fields.push(format!(
2868                "<MultiAZ>{}</MultiAZ>",
2869                if multi_az { "true" } else { "false" }
2870            ));
2871        }
2872        if let Some(ref engine_version) = pending.engine_version {
2873            fields.push(format!(
2874                "<EngineVersion>{}</EngineVersion>",
2875                xml_escape(engine_version)
2876            ));
2877        }
2878        if pending.master_user_password.is_some() {
2879            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2880        }
2881        if !fields.is_empty() {
2882            format!(
2883                "<PendingModifiedValues>{}</PendingModifiedValues>",
2884                fields.join("")
2885            )
2886        } else {
2887            String::new()
2888        }
2889    } else {
2890        String::new()
2891    };
2892
2893    let latest_restorable_time_xml = instance
2894        .latest_restorable_time
2895        .map(|t| {
2896            format!(
2897                "<LatestRestorableTime>{}</LatestRestorableTime>",
2898                t.to_rfc3339()
2899            )
2900        })
2901        .unwrap_or_default();
2902
2903    // Endpoint is suppressed while the container is still coming up so
2904    // SDK callers don't try to dial an empty host:0. Once the background
2905    // task fills in `endpoint_address` and `port`, DescribeDBInstances
2906    // returns the real endpoint.
2907    let endpoint_xml = if instance.endpoint_address.is_empty() || instance.port == 0 {
2908        String::new()
2909    } else {
2910        format!(
2911            "<Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>",
2912            xml_escape(&instance.endpoint_address),
2913            instance.port
2914        )
2915    };
2916
2917    format!(
2918        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2919         <DBInstanceClass>{class}</DBInstanceClass>\
2920         <Engine>{engine}</Engine>\
2921         <DBInstanceStatus>{status}</DBInstanceStatus>\
2922         <MasterUsername>{master_username}</MasterUsername>\
2923         {db_name_xml}\
2924         {endpoint_xml}\
2925         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2926         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2927         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2928         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2929         <DBSecurityGroups/>\
2930         {vpc_security_groups_xml}\
2931         {db_parameter_groups_xml}\
2932         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2933         {latest_restorable_time_xml}\
2934         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2935         <MultiAZ>{multi_az}</MultiAZ>\
2936         <EngineVersion>{engine_version}</EngineVersion>\
2937         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2938         {read_replica_identifiers_xml}\
2939         {read_replica_source_xml}\
2940         <LicenseModel>{license_model}</LicenseModel>\
2941         {option_group_memberships_xml}\
2942         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2943         <StorageType>gp2</StorageType>\
2944         <DbInstancePort>{port}</DbInstancePort>\
2945         <StorageEncrypted>false</StorageEncrypted>\
2946         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2947         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2948         {pending_modified_values_xml}\
2949         <DBInstanceArn>{arn}</DBInstanceArn>",
2950        identifier = xml_escape(&instance.db_instance_identifier),
2951        class = xml_escape(&instance.db_instance_class),
2952        engine = xml_escape(&instance.engine),
2953        status = xml_escape(status),
2954        master_username = xml_escape(&instance.master_username),
2955        port = instance.port,
2956        allocated_storage = instance.allocated_storage,
2957        create_time = instance.created_at.to_rfc3339(),
2958        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2959        backup_retention_period = instance.backup_retention_period,
2960        multi_az = if instance.multi_az { "true" } else { "false" },
2961        engine_version = xml_escape(&instance.engine_version),
2962        license_model = license_model_for_engine(&instance.engine),
2963        publicly_accessible = if instance.publicly_accessible {
2964            "true"
2965        } else {
2966            "false"
2967        },
2968        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2969        deletion_protection = if instance.deletion_protection {
2970            "true"
2971        } else {
2972            "false"
2973        },
2974        arn = xml_escape(&instance.db_instance_arn),
2975    )
2976}
2977
2978fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2979    format!(
2980        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2981         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2982         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2983         <Engine>{}</Engine>\
2984         <EngineVersion>{}</EngineVersion>\
2985         <AllocatedStorage>{}</AllocatedStorage>\
2986         <Status>{}</Status>\
2987         <Port>{}</Port>\
2988         <MasterUsername>{}</MasterUsername>\
2989         {}\
2990         <DbiResourceId>{}</DbiResourceId>\
2991         <SnapshotType>{}</SnapshotType>\
2992         <DBSnapshotArn>{}</DBSnapshotArn>",
2993        xml_escape(&snapshot.db_snapshot_identifier),
2994        xml_escape(&snapshot.db_instance_identifier),
2995        snapshot.snapshot_create_time.to_rfc3339(),
2996        xml_escape(&snapshot.engine),
2997        xml_escape(&snapshot.engine_version),
2998        snapshot.allocated_storage,
2999        xml_escape(&snapshot.status),
3000        snapshot.port,
3001        xml_escape(&snapshot.master_username),
3002        snapshot
3003            .db_name
3004            .as_ref()
3005            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
3006            .unwrap_or_default(),
3007        xml_escape(&snapshot.dbi_resource_id),
3008        xml_escape(&snapshot.snapshot_type),
3009        xml_escape(&snapshot.db_snapshot_arn),
3010    )
3011}
3012
3013fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
3014    let subnets_xml = subnet_group
3015        .subnet_ids
3016        .iter()
3017        .zip(&subnet_group.subnet_availability_zones)
3018        .map(|(subnet_id, az)| {
3019            format!(
3020                "<Subnet>\
3021                 <SubnetIdentifier>{}</SubnetIdentifier>\
3022                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3023                 <SubnetStatus>Active</SubnetStatus>\
3024                 </Subnet>",
3025                xml_escape(subnet_id),
3026                xml_escape(az)
3027            )
3028        })
3029        .collect::<String>();
3030
3031    format!(
3032        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
3033         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
3034         <VpcId>{}</VpcId>\
3035         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
3036         <Subnets>{}</Subnets>\
3037         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
3038        xml_escape(&subnet_group.db_subnet_group_name),
3039        xml_escape(&subnet_group.db_subnet_group_description),
3040        xml_escape(&subnet_group.vpc_id),
3041        subnets_xml,
3042        xml_escape(&subnet_group.db_subnet_group_arn),
3043    )
3044}
3045
3046fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
3047    format!(
3048        "<DBParameterGroupName>{}</DBParameterGroupName>\
3049         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
3050         <Description>{}</Description>\
3051         <DBParameterGroupArn>{}</DBParameterGroupArn>",
3052        xml_escape(&parameter_group.db_parameter_group_name),
3053        xml_escape(&parameter_group.db_parameter_group_family),
3054        xml_escape(&parameter_group.description),
3055        xml_escape(&parameter_group.db_parameter_group_arn),
3056    )
3057}
3058
3059fn db_instance_not_found(identifier: &str) -> AwsServiceError {
3060    AwsServiceError::aws_error(
3061        StatusCode::NOT_FOUND,
3062        "DBInstanceNotFound",
3063        format!("DBInstance {} not found.", identifier),
3064    )
3065}
3066
3067fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
3068    AwsServiceError::aws_error(
3069        StatusCode::NOT_FOUND,
3070        "DBSnapshotNotFound",
3071        format!("DBSnapshot {} not found.", identifier),
3072    )
3073}
3074
3075fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
3076    AwsServiceError::aws_error(
3077        StatusCode::NOT_FOUND,
3078        "DBInstanceNotFound",
3079        format!("DBInstance {resource_name} not found."),
3080    )
3081}
3082
3083fn find_instance_by_arn<'a>(
3084    state: &'a crate::state::RdsState,
3085    resource_name: &str,
3086) -> Result<&'a DbInstance, AwsServiceError> {
3087    state
3088        .instances
3089        .values()
3090        .find(|instance| instance.db_instance_arn == resource_name)
3091        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3092}
3093
3094fn find_instance_by_arn_mut<'a>(
3095    state: &'a mut crate::state::RdsState,
3096    resource_name: &str,
3097) -> Result<&'a mut DbInstance, AwsServiceError> {
3098    state
3099        .instances
3100        .values_mut()
3101        .find(|instance| instance.db_instance_arn == resource_name)
3102        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3103}
3104
3105fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3106    for tag in incoming {
3107        if let Some(existing_tag) = existing
3108            .iter_mut()
3109            .find(|candidate| candidate.key == tag.key)
3110        {
3111            existing_tag.value = tag.value.clone();
3112        } else {
3113            existing.push(tag.clone());
3114        }
3115    }
3116}
3117
3118fn license_model_for_engine(engine: &str) -> &'static str {
3119    // Match AWS's reported license model exactly. Oracle and SQL Server
3120    // both use the BYOL/license-included split; fakecloud reports
3121    // license-included since the upstream dev-edition images are
3122    // free-to-use. Db2 is reported as bring-your-own-license to mirror
3123    // AWS's RDS for Db2 default.
3124    match engine {
3125        "mysql" | "mariadb" => "general-public-license",
3126        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3127        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3128        "db2-se" | "db2-ae" => "bring-your-own-license",
3129        _ => "postgresql-license",
3130    }
3131}
3132
3133fn default_db_name(engine: &str) -> &'static str {
3134    match engine {
3135        "mysql" | "mariadb" => "mysql",
3136        // Oracle's gvenzl image creates an `ORACLE_DATABASE` alongside
3137        // the built-in FREEPDB1 — keep `ORCL` as the default name to
3138        // match what AWS RDS for Oracle returns when you don't pass
3139        // `DBName`.
3140        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3141        // SQL Server installs system DBs by default; AWS doesn't
3142        // create a user DB unless `DBName` is supplied. Use `master`
3143        // as the default the SDK can connect to.
3144        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3145        "db2-se" | "db2-ae" => "BLUDB",
3146        _ => "postgres",
3147    }
3148}
3149
3150/// Pick the port AWS defaults to for a freshly-created instance of
3151/// `engine`. Mirrors the AWS RDS defaults so client SDKs that connect
3152/// without an explicit `--port` flag hit the right listener.
3153fn default_port_for_engine(engine: &str) -> i32 {
3154    match engine {
3155        "postgres" => 5432,
3156        "mysql" | "mariadb" => 3306,
3157        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3158        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3159        "db2-se" | "db2-ae" => 50000,
3160        _ => 5432,
3161    }
3162}
3163
3164/// Pick the built-in parameter group name AWS assigns to a new
3165/// instance when the caller doesn't override it. The name encodes the
3166/// engine family plus its major version (e.g. `default.postgres16`,
3167/// `default.mysql8.0`, `default.oracle-ee-23`, `default.sqlserver-ex-16`,
3168/// `default.db2-se-11.5`).
3169fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3170    match engine {
3171        "postgres" => {
3172            let major = engine_version.split('.').next().unwrap_or("16");
3173            format!("default.postgres{}", major)
3174        }
3175        "mysql" => {
3176            let major = if engine_version.starts_with("5.7") {
3177                "5.7"
3178            } else {
3179                "8.0"
3180            };
3181            format!("default.mysql{}", major)
3182        }
3183        "mariadb" => {
3184            let major = if engine_version.starts_with("11.4") {
3185                "11.4"
3186            } else if engine_version.starts_with("10.11") {
3187                "10.11"
3188            } else {
3189                "10.6"
3190            };
3191            format!("default.mariadb{}", major)
3192        }
3193        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3194            let major = engine_version.split('.').next().unwrap_or("23");
3195            format!("default.{engine}-{major}")
3196        }
3197        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3198            // AWS uses the SQL Server major-version number ("16" for
3199            // 2022, "15" for 2019) in the default parameter group.
3200            let major = engine_version.split('.').next().unwrap_or("16");
3201            format!("default.{engine}-{major}")
3202        }
3203        "db2-se" | "db2-ae" => {
3204            // Db2 ships major.minor as the parameter-group key
3205            // (e.g. `default.db2-se-11.5`).
3206            let mut parts = engine_version.split('.');
3207            let major = parts.next().unwrap_or("11");
3208            let minor = parts.next().unwrap_or("5");
3209            format!("default.{engine}-{major}.{minor}")
3210        }
3211        _ => "default.postgres16".to_string(),
3212    }
3213}
3214
3215fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3216    match error {
3217        RuntimeError::Unavailable => AwsServiceError::aws_error(
3218            StatusCode::SERVICE_UNAVAILABLE,
3219            "InvalidParameterValue",
3220            "Docker/Podman is required for RDS DB instances but is not available",
3221        ),
3222        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3223            StatusCode::INTERNAL_SERVER_ERROR,
3224            "InternalFailure",
3225            message,
3226        ),
3227    }
3228}
3229
3230#[cfg(test)]
3231mod tests {
3232    use std::collections::HashMap;
3233    use std::sync::Arc;
3234
3235    use bytes::Bytes;
3236    use chrono::Utc;
3237    use http::{HeaderMap, Method};
3238    use parking_lot::RwLock;
3239    use uuid::Uuid;
3240
3241    use super::{
3242        db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3243        filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3244        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3245        RdsSourceType,
3246    };
3247    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3248    use fakecloud_core::delivery::DeliveryBus;
3249    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3250
3251    #[test]
3252    fn default_port_matches_aws_for_each_engine() {
3253        assert_eq!(default_port_for_engine("postgres"), 5432);
3254        assert_eq!(default_port_for_engine("mysql"), 3306);
3255        assert_eq!(default_port_for_engine("mariadb"), 3306);
3256        assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3257        assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3258        assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3259        assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3260        assert_eq!(default_port_for_engine("db2-se"), 50000);
3261        assert_eq!(default_port_for_engine("db2-ae"), 50000);
3262    }
3263
3264    #[test]
3265    fn default_parameter_group_uses_engine_major_version() {
3266        assert_eq!(
3267            default_parameter_group("postgres", "16.3"),
3268            "default.postgres16"
3269        );
3270        assert_eq!(
3271            default_parameter_group("mysql", "8.0.35"),
3272            "default.mysql8.0"
3273        );
3274        assert_eq!(
3275            default_parameter_group("oracle-ee", "23.0.0"),
3276            "default.oracle-ee-23"
3277        );
3278        assert_eq!(
3279            default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3280            "default.sqlserver-ex-16"
3281        );
3282        assert_eq!(
3283            default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3284            "default.db2-se-11.5"
3285        );
3286    }
3287
3288    #[test]
3289    fn license_model_reflects_engine_class() {
3290        assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3291        assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3292        assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3293        assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3294        assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3295    }
3296
3297    #[test]
3298    fn default_db_name_picks_per_engine_default() {
3299        assert_eq!(default_db_name("postgres"), "postgres");
3300        assert_eq!(default_db_name("mysql"), "mysql");
3301        assert_eq!(default_db_name("oracle-ee"), "ORCL");
3302        assert_eq!(default_db_name("sqlserver-ex"), "master");
3303        assert_eq!(default_db_name("db2-se"), "BLUDB");
3304    }
3305
3306    #[test]
3307    fn validate_create_request_accepts_new_engines() {
3308        for (engine, version, port) in [
3309            ("oracle-ee", "23.0.0", 1521),
3310            ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3311            ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3312        ] {
3313            validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3314                .expect("engine should be accepted");
3315        }
3316    }
3317
3318    #[test]
3319    fn validate_create_request_rejects_unsupported_engine_version() {
3320        let err =
3321            validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3322                .expect_err("12.x is not in the supported list");
3323        let msg = format!("{err:?}");
3324        assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3325    }
3326
3327    #[test]
3328    fn filter_engine_versions_matches_requested_engine() {
3329        let versions = default_engine_versions();
3330
3331        let filtered =
3332            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3333
3334        assert_eq!(filtered.len(), 4); // All postgres versions
3335        assert!(filtered.iter().all(|v| v.engine == "postgres"));
3336    }
3337
3338    #[test]
3339    fn filter_orderable_options_respects_instance_class() {
3340        let options = default_orderable_options();
3341
3342        let filtered = filter_orderable_options(
3343            &options,
3344            &Some("postgres".to_string()),
3345            &Some("16.3".to_string()),
3346            &Some("db.t3.micro".to_string()),
3347            &None,
3348            Some(true),
3349        );
3350
3351        assert_eq!(filtered.len(), 1);
3352        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3353    }
3354
3355    #[test]
3356    fn validate_create_request_rejects_unsupported_engine() {
3357        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3358            .expect_err("unsupported engine");
3359
3360        assert_eq!(error.code(), "InvalidParameterValue");
3361    }
3362
3363    #[test]
3364    fn optional_i32_param_rejects_invalid_integer() {
3365        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3366
3367        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3368
3369        assert_eq!(error.code(), "InvalidParameterValue");
3370    }
3371
3372    #[test]
3373    fn db_instance_xml_renders_endpoint_and_status() {
3374        let created_at = Utc::now();
3375        let instance = DbInstance {
3376            db_instance_identifier: "test-db".to_string(),
3377            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3378            db_instance_class: "db.t3.micro".to_string(),
3379            engine: "postgres".to_string(),
3380            engine_version: "16.3".to_string(),
3381            db_instance_status: "available".to_string(),
3382            master_username: "admin".to_string(),
3383            db_name: Some("appdb".to_string()),
3384            endpoint_address: "127.0.0.1".to_string(),
3385            port: 15432,
3386            allocated_storage: 20,
3387            publicly_accessible: true,
3388            deletion_protection: false,
3389            created_at,
3390            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3391            master_user_password: "secret123".to_string(),
3392            container_id: "container".to_string(),
3393            host_port: 15432,
3394            tags: Vec::new(),
3395            read_replica_source_db_instance_identifier: None,
3396            read_replica_db_instance_identifiers: Vec::new(),
3397            vpc_security_group_ids: vec!["sg-12345678".to_string()],
3398            db_parameter_group_name: Some("default.postgres16".to_string()),
3399            backup_retention_period: 1,
3400            preferred_backup_window: "03:00-04:00".to_string(),
3401            latest_restorable_time: Some(created_at),
3402            option_group_name: None,
3403            multi_az: false,
3404            pending_modified_values: None,
3405        };
3406
3407        let xml = db_instance_xml(&instance, Some("creating"));
3408
3409        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3410        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3411        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3412    }
3413
3414    #[test]
3415    fn parse_tags_reads_rds_query_shape() {
3416        let request = request(
3417            "AddTagsToResource",
3418            &[
3419                ("Tags.Tag.1.Key", "env"),
3420                ("Tags.Tag.1.Value", "dev"),
3421                ("Tags.Tag.2.Key", "team"),
3422                ("Tags.Tag.2.Value", "core"),
3423            ],
3424        );
3425
3426        let tags = parse_tags(&request).expect("tags");
3427
3428        assert_eq!(
3429            tags,
3430            vec![
3431                RdsTag {
3432                    key: "env".to_string(),
3433                    value: "dev".to_string(),
3434                },
3435                RdsTag {
3436                    key: "team".to_string(),
3437                    value: "core".to_string(),
3438                }
3439            ]
3440        );
3441    }
3442
3443    #[test]
3444    fn parse_tag_keys_reads_member_shape() {
3445        let request = request(
3446            "RemoveTagsFromResource",
3447            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3448        );
3449
3450        let tag_keys = parse_tag_keys(&request).expect("tag keys");
3451
3452        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3453    }
3454
3455    #[test]
3456    fn merge_tags_updates_existing_values() {
3457        let mut tags = vec![RdsTag {
3458            key: "env".to_string(),
3459            value: "dev".to_string(),
3460        }];
3461
3462        merge_tags(
3463            &mut tags,
3464            &[
3465                RdsTag {
3466                    key: "env".to_string(),
3467                    value: "prod".to_string(),
3468                },
3469                RdsTag {
3470                    key: "team".to_string(),
3471                    value: "core".to_string(),
3472                },
3473            ],
3474        );
3475
3476        assert_eq!(tags.len(), 2);
3477        assert_eq!(tags[0].value, "prod");
3478        assert_eq!(tags[1].key, "team");
3479    }
3480
3481    #[tokio::test]
3482    async fn describe_engine_versions_returns_xml_body() {
3483        let service = RdsService::new(Arc::new(RwLock::new(
3484            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3485        )));
3486        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3487
3488        let response = service.handle(request).await.expect("response");
3489        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3490
3491        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3492        assert!(body.contains("<Engine>postgres</Engine>"));
3493        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3494    }
3495
3496    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3497        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3498        for (key, value) in params {
3499            query_params.insert((*key).to_string(), (*value).to_string());
3500        }
3501
3502        AwsRequest {
3503            service: "rds".to_string(),
3504            action: action.to_string(),
3505            region: "us-east-1".to_string(),
3506            account_id: "123456789012".to_string(),
3507            request_id: "test-request-id".to_string(),
3508            headers: HeaderMap::new(),
3509            query_params,
3510            body: Bytes::new(),
3511            body_stream: parking_lot::Mutex::new(None),
3512            path_segments: vec![],
3513            raw_path: "/".to_string(),
3514            raw_query: String::new(),
3515            method: Method::POST,
3516            is_query_protocol: true,
3517            access_key_id: None,
3518            principal: None,
3519        }
3520    }
3521
3522    // ── Helpers for handler tests ────────────────────────────────────
3523
3524    fn make_service() -> RdsService {
3525        RdsService::new(Arc::new(RwLock::new(
3526            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3527        )))
3528    }
3529
3530    #[derive(Default)]
3531    struct CapturedEvent {
3532        source: String,
3533        detail_type: String,
3534        detail: String,
3535    }
3536
3537    #[derive(Default)]
3538    struct RecordingEb {
3539        events: std::sync::Mutex<Vec<CapturedEvent>>,
3540    }
3541
3542    impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3543        fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3544            self.events.lock().unwrap().push(CapturedEvent {
3545                source: source.to_string(),
3546                detail_type: detail_type.to_string(),
3547                detail: detail.to_string(),
3548            });
3549        }
3550    }
3551
3552    fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3553        let recorder = Arc::new(RecordingEb::default());
3554        let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3555        let svc = RdsService::new(Arc::new(RwLock::new(
3556            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3557        )))
3558        .with_delivery_bus(bus);
3559        (svc, recorder)
3560    }
3561
3562    #[test]
3563    fn emit_event_emits_aws_rds_event_via_bus() {
3564        let (svc, rec) = make_service_with_recorder();
3565        svc.emit_event(
3566            RdsSourceType::DbInstance,
3567            "my-db",
3568            "arn:aws:rds:us-east-1:123456789012:db:my-db",
3569            "RDS-EVENT-0005",
3570            &["creation"],
3571            "DB instance created",
3572        );
3573        let events = rec.events.lock().unwrap();
3574        assert_eq!(events.len(), 1);
3575        let e = &events[0];
3576        assert_eq!(e.source, "aws.rds");
3577        assert_eq!(e.detail_type, "RDS DB Instance Event");
3578        let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3579        assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3580        assert_eq!(detail["SourceType"], "DB_INSTANCE");
3581        assert_eq!(detail["SourceIdentifier"], "my-db");
3582        assert_eq!(detail["Message"], "DB instance created");
3583        assert_eq!(detail["EventCategories"][0], "creation");
3584    }
3585
3586    #[test]
3587    fn emit_event_no_op_without_bus() {
3588        let svc = make_service();
3589        svc.emit_event(
3590            RdsSourceType::DbSnapshot,
3591            "snap",
3592            "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3593            "RDS-EVENT-0042",
3594            &["creation"],
3595            "Manual snapshot created",
3596        );
3597    }
3598
3599    #[test]
3600    fn rds_source_type_detail_type_mapping() {
3601        assert_eq!(
3602            RdsSourceType::DbInstance.detail_type(),
3603            "RDS DB Instance Event"
3604        );
3605        assert_eq!(
3606            RdsSourceType::DbSnapshot.detail_type(),
3607            "RDS DB Snapshot Event"
3608        );
3609        assert_eq!(
3610            RdsSourceType::DbParameterGroup.detail_type(),
3611            "RDS DB Parameter Group Event"
3612        );
3613    }
3614
3615    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3616        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3617    }
3618
3619    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3620        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3621        let mut accounts = svc.state.write();
3622        let state = accounts.default_mut();
3623        state.instances.insert(
3624            identifier.to_string(),
3625            DbInstance {
3626                db_instance_identifier: identifier.to_string(),
3627                db_instance_arn: arn.clone(),
3628                db_instance_class: "db.t3.micro".to_string(),
3629                engine: "postgres".to_string(),
3630                engine_version: "16.3".to_string(),
3631                db_instance_status: "available".to_string(),
3632                master_username: "admin".to_string(),
3633                db_name: Some("appdb".to_string()),
3634                endpoint_address: "127.0.0.1".to_string(),
3635                port: 15432,
3636                allocated_storage: 20,
3637                publicly_accessible: true,
3638                deletion_protection: false,
3639                created_at: Utc::now(),
3640                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3641                master_user_password: "secret".to_string(),
3642                container_id: "container".to_string(),
3643                host_port: 15432,
3644                tags: Vec::new(),
3645                read_replica_source_db_instance_identifier: None,
3646                read_replica_db_instance_identifiers: Vec::new(),
3647                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3648                db_parameter_group_name: Some("default.postgres16".to_string()),
3649                backup_retention_period: 1,
3650                preferred_backup_window: "03:00-04:00".to_string(),
3651                latest_restorable_time: None,
3652                option_group_name: None,
3653                multi_az: false,
3654                pending_modified_values: None,
3655            },
3656        );
3657        arn
3658    }
3659
3660    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3661        match result {
3662            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3663            Err(e) => {
3664                assert_eq!(e.code(), expected_code, "wrong error code");
3665                e
3666            }
3667        }
3668    }
3669
3670    // ── Tag operations ───────────────────────────────────────────────
3671
3672    #[test]
3673    fn add_tags_requires_resource_name() {
3674        let svc = make_service();
3675        let req = request("AddTagsToResource", &[]);
3676        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3677    }
3678
3679    #[test]
3680    fn add_tags_requires_at_least_one_tag() {
3681        let svc = make_service();
3682        let arn = seed_instance(&svc, "db1");
3683        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3684        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3685    }
3686
3687    #[test]
3688    fn add_tags_appends_then_list_tags_returns_them() {
3689        let svc = make_service();
3690        let arn = seed_instance(&svc, "db1");
3691        let add_req = request(
3692            "AddTagsToResource",
3693            &[
3694                ("ResourceName", arn.as_str()),
3695                ("Tags.Tag.1.Key", "env"),
3696                ("Tags.Tag.1.Value", "dev"),
3697            ],
3698        );
3699        svc.add_tags_to_resource(&add_req).unwrap();
3700
3701        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3702        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3703        assert!(body.contains("<Key>env</Key>"));
3704        assert!(body.contains("<Value>dev</Value>"));
3705    }
3706
3707    #[test]
3708    fn list_tags_rejects_filters_param() {
3709        let svc = make_service();
3710        let arn = seed_instance(&svc, "db1");
3711        let req = request(
3712            "ListTagsForResource",
3713            &[
3714                ("ResourceName", arn.as_str()),
3715                ("Filters.Filter.1.Name", "x"),
3716            ],
3717        );
3718        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3719    }
3720
3721    #[test]
3722    fn list_tags_unknown_arn_errors() {
3723        let svc = make_service();
3724        let req = request(
3725            "ListTagsForResource",
3726            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3727        );
3728        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3729    }
3730
3731    #[test]
3732    fn remove_tags_strips_only_listed_keys() {
3733        let svc = make_service();
3734        let arn = seed_instance(&svc, "db1");
3735        {
3736            let mut __a = svc.state.write();
3737            let state = __a.default_mut();
3738            let inst = state.instances.get_mut("db1").unwrap();
3739            inst.tags = vec![
3740                RdsTag {
3741                    key: "env".to_string(),
3742                    value: "dev".to_string(),
3743                },
3744                RdsTag {
3745                    key: "team".to_string(),
3746                    value: "core".to_string(),
3747                },
3748            ];
3749        }
3750        let req = request(
3751            "RemoveTagsFromResource",
3752            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3753        );
3754        svc.remove_tags_from_resource(&req).unwrap();
3755
3756        let __a = svc.state.read();
3757        let state = __a.default_ref();
3758        let tags = &state.instances.get("db1").unwrap().tags;
3759        assert_eq!(tags.len(), 1);
3760        assert_eq!(tags[0].key, "team");
3761    }
3762
3763    #[test]
3764    fn remove_tags_requires_keys() {
3765        let svc = make_service();
3766        let arn = seed_instance(&svc, "db1");
3767        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3768        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3769    }
3770
3771    // ── DB Subnet Groups ─────────────────────────────────────────────
3772
3773    fn create_subnet_group(svc: &RdsService, name: &str) {
3774        let req = request(
3775            "CreateDBSubnetGroup",
3776            &[
3777                ("DBSubnetGroupName", name),
3778                ("DBSubnetGroupDescription", "test"),
3779                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3780                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3781            ],
3782        );
3783        svc.create_db_subnet_group(&req).unwrap();
3784    }
3785
3786    #[test]
3787    fn create_db_subnet_group_requires_two_subnets() {
3788        let svc = make_service();
3789        let req = request(
3790            "CreateDBSubnetGroup",
3791            &[
3792                ("DBSubnetGroupName", "sg1"),
3793                ("DBSubnetGroupDescription", "t"),
3794                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3795            ],
3796        );
3797        assert_code(
3798            svc.create_db_subnet_group(&req),
3799            "DBSubnetGroupDoesNotCoverEnoughAZs",
3800        );
3801    }
3802
3803    #[test]
3804    fn create_db_subnet_group_rejects_empty_subnets() {
3805        let svc = make_service();
3806        let req = request(
3807            "CreateDBSubnetGroup",
3808            &[
3809                ("DBSubnetGroupName", "sg1"),
3810                ("DBSubnetGroupDescription", "t"),
3811            ],
3812        );
3813        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3814    }
3815
3816    #[test]
3817    fn create_db_subnet_group_rejects_duplicates() {
3818        let svc = make_service();
3819        create_subnet_group(&svc, "sg1");
3820        let req = request(
3821            "CreateDBSubnetGroup",
3822            &[
3823                ("DBSubnetGroupName", "sg1"),
3824                ("DBSubnetGroupDescription", "t"),
3825                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3826                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3827            ],
3828        );
3829        assert_code(
3830            svc.create_db_subnet_group(&req),
3831            "DBSubnetGroupAlreadyExists",
3832        );
3833    }
3834
3835    #[test]
3836    fn describe_db_subnet_groups_by_name_or_list() {
3837        let svc = make_service();
3838        create_subnet_group(&svc, "sg-alpha");
3839        create_subnet_group(&svc, "sg-beta");
3840
3841        let by_name = request(
3842            "DescribeDBSubnetGroups",
3843            &[("DBSubnetGroupName", "sg-alpha")],
3844        );
3845        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3846        assert!(body.contains("sg-alpha"));
3847        assert!(!body.contains("sg-beta"));
3848
3849        let list_all = request("DescribeDBSubnetGroups", &[]);
3850        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3851        assert!(body.contains("sg-alpha"));
3852        assert!(body.contains("sg-beta"));
3853    }
3854
3855    #[test]
3856    fn describe_db_subnet_groups_unknown_name_errors() {
3857        let svc = make_service();
3858        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3859        assert_code(
3860            svc.describe_db_subnet_groups(&req),
3861            "DBSubnetGroupNotFoundFault",
3862        );
3863    }
3864
3865    #[test]
3866    fn delete_db_subnet_group_unknown_errors() {
3867        let svc = make_service();
3868        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3869        assert_code(
3870            svc.delete_db_subnet_group(&req),
3871            "DBSubnetGroupNotFoundFault",
3872        );
3873    }
3874
3875    #[test]
3876    fn delete_db_subnet_group_removes_entry() {
3877        let svc = make_service();
3878        create_subnet_group(&svc, "sg1");
3879        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3880        svc.delete_db_subnet_group(&req).unwrap();
3881        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3882    }
3883
3884    #[test]
3885    fn modify_db_subnet_group_updates_subnet_ids() {
3886        let svc = make_service();
3887        create_subnet_group(&svc, "sg1");
3888        let req = request(
3889            "ModifyDBSubnetGroup",
3890            &[
3891                ("DBSubnetGroupName", "sg1"),
3892                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3893                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3894            ],
3895        );
3896        svc.modify_db_subnet_group(&req).unwrap();
3897
3898        let __a = svc.state.read();
3899        let state = __a.default_ref();
3900        let sg = state.subnet_groups.get("sg1").unwrap();
3901        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3902    }
3903
3904    // ── DB Parameter Groups ──────────────────────────────────────────
3905
3906    fn create_param_group(svc: &RdsService, name: &str) {
3907        let req = request(
3908            "CreateDBParameterGroup",
3909            &[
3910                ("DBParameterGroupName", name),
3911                ("DBParameterGroupFamily", "postgres16"),
3912                ("Description", "test"),
3913            ],
3914        );
3915        svc.create_db_parameter_group(&req).unwrap();
3916    }
3917
3918    #[test]
3919    fn create_db_parameter_group_rejects_unknown_family() {
3920        let svc = make_service();
3921        let req = request(
3922            "CreateDBParameterGroup",
3923            &[
3924                ("DBParameterGroupName", "pg1"),
3925                ("DBParameterGroupFamily", "oracle19"),
3926                ("Description", "t"),
3927            ],
3928        );
3929        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3930    }
3931
3932    #[test]
3933    fn create_db_parameter_group_rejects_duplicates() {
3934        let svc = make_service();
3935        create_param_group(&svc, "pg1");
3936        let req = request(
3937            "CreateDBParameterGroup",
3938            &[
3939                ("DBParameterGroupName", "pg1"),
3940                ("DBParameterGroupFamily", "postgres16"),
3941                ("Description", "t"),
3942            ],
3943        );
3944        assert_code(
3945            svc.create_db_parameter_group(&req),
3946            "DBParameterGroupAlreadyExists",
3947        );
3948    }
3949
3950    #[test]
3951    fn describe_db_parameter_groups_by_name_or_list() {
3952        let svc = make_service();
3953        create_param_group(&svc, "pg-alpha");
3954        create_param_group(&svc, "pg-beta");
3955        let by_name = request(
3956            "DescribeDBParameterGroups",
3957            &[("DBParameterGroupName", "pg-alpha")],
3958        );
3959        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3960        assert!(body.contains("pg-alpha"));
3961        assert!(!body.contains("pg-beta"));
3962        let list = request("DescribeDBParameterGroups", &[]);
3963        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3964        assert!(body.contains("pg-alpha"));
3965        assert!(body.contains("pg-beta"));
3966    }
3967
3968    #[test]
3969    fn describe_db_parameter_groups_unknown_name_errors() {
3970        let svc = make_service();
3971        let req = request(
3972            "DescribeDBParameterGroups",
3973            &[("DBParameterGroupName", "ghost")],
3974        );
3975        assert_code(
3976            svc.describe_db_parameter_groups(&req),
3977            "DBParameterGroupNotFound",
3978        );
3979    }
3980
3981    #[test]
3982    fn delete_db_parameter_group_rejects_default_groups() {
3983        let svc = make_service();
3984        let req = request(
3985            "DeleteDBParameterGroup",
3986            &[("DBParameterGroupName", "default.postgres16")],
3987        );
3988        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3989    }
3990
3991    #[test]
3992    fn delete_db_parameter_group_unknown_errors() {
3993        let svc = make_service();
3994        let req = request(
3995            "DeleteDBParameterGroup",
3996            &[("DBParameterGroupName", "ghost")],
3997        );
3998        assert_code(
3999            svc.delete_db_parameter_group(&req),
4000            "DBParameterGroupNotFound",
4001        );
4002    }
4003
4004    #[test]
4005    fn delete_db_parameter_group_removes_entry() {
4006        let svc = make_service();
4007        create_param_group(&svc, "pg1");
4008        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
4009        svc.delete_db_parameter_group(&req).unwrap();
4010        assert!(!svc
4011            .state
4012            .read()
4013            .default_ref()
4014            .parameter_groups
4015            .contains_key("pg1"));
4016    }
4017
4018    #[test]
4019    fn modify_db_parameter_group_updates_description() {
4020        let svc = make_service();
4021        create_param_group(&svc, "pg1");
4022        let req = request(
4023            "ModifyDBParameterGroup",
4024            &[
4025                ("DBParameterGroupName", "pg1"),
4026                ("Description", "shiny new"),
4027            ],
4028        );
4029        svc.modify_db_parameter_group(&req).unwrap();
4030        let __a = svc.state.read();
4031        let state = __a.default_ref();
4032        assert_eq!(
4033            state.parameter_groups.get("pg1").unwrap().description,
4034            "shiny new"
4035        );
4036    }
4037
4038    #[test]
4039    fn modify_db_parameter_group_unknown_errors() {
4040        let svc = make_service();
4041        let req = request(
4042            "ModifyDBParameterGroup",
4043            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
4044        );
4045        assert_code(
4046            svc.modify_db_parameter_group(&req),
4047            "DBParameterGroupNotFound",
4048        );
4049    }
4050
4051    // ── DescribeDBInstances ──────────────────────────────────────────
4052
4053    #[test]
4054    fn describe_db_instances_by_id_returns_only_one() {
4055        let svc = make_service();
4056        seed_instance(&svc, "db1");
4057        seed_instance(&svc, "db2");
4058        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
4059        let body = body_of(svc.describe_db_instances(&req).unwrap());
4060        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
4061        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
4062    }
4063
4064    #[test]
4065    fn describe_db_instances_unknown_id_errors() {
4066        let svc = make_service();
4067        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4068        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4069    }
4070
4071    #[test]
4072    fn describe_db_instances_lists_all_when_unbounded() {
4073        let svc = make_service();
4074        seed_instance(&svc, "db1");
4075        seed_instance(&svc, "db2");
4076        seed_instance(&svc, "db3");
4077        let req = request("DescribeDBInstances", &[]);
4078        let body = body_of(svc.describe_db_instances(&req).unwrap());
4079        for id in ["db1", "db2", "db3"] {
4080            assert!(body.contains(&format!(
4081                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
4082            )));
4083        }
4084    }
4085
4086    // ── ModifyDBInstance ─────────────────────────────────────────────
4087
4088    #[test]
4089    fn modify_db_instance_requires_at_least_one_change() {
4090        let svc = make_service();
4091        seed_instance(&svc, "db1");
4092        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4093        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4094    }
4095
4096    #[test]
4097    fn modify_db_instance_unknown_errors() {
4098        let svc = make_service();
4099        let req = request(
4100            "ModifyDBInstance",
4101            &[
4102                ("DBInstanceIdentifier", "ghost"),
4103                ("DBInstanceClass", "db.t3.small"),
4104            ],
4105        );
4106        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4107    }
4108
4109    #[test]
4110    fn modify_db_instance_apply_immediately_updates_class() {
4111        let svc = make_service();
4112        seed_instance(&svc, "db1");
4113        let req = request(
4114            "ModifyDBInstance",
4115            &[
4116                ("DBInstanceIdentifier", "db1"),
4117                ("DBInstanceClass", "db.t3.small"),
4118                ("ApplyImmediately", "true"),
4119            ],
4120        );
4121        svc.modify_db_instance(&req).unwrap();
4122        let __a = svc.state.read();
4123        let state = __a.default_ref();
4124        assert_eq!(
4125            state.instances.get("db1").unwrap().db_instance_class,
4126            "db.t3.small"
4127        );
4128    }
4129
4130    #[test]
4131    fn modify_db_instance_pending_when_not_apply_immediately() {
4132        let svc = make_service();
4133        seed_instance(&svc, "db1");
4134        let req = request(
4135            "ModifyDBInstance",
4136            &[
4137                ("DBInstanceIdentifier", "db1"),
4138                ("DBInstanceClass", "db.t3.small"),
4139                ("ApplyImmediately", "false"),
4140            ],
4141        );
4142        svc.modify_db_instance(&req).unwrap();
4143        let __a = svc.state.read();
4144        let state = __a.default_ref();
4145        let inst = state.instances.get("db1").unwrap();
4146        assert_eq!(inst.db_instance_class, "db.t3.micro");
4147        assert_eq!(
4148            inst.pending_modified_values
4149                .as_ref()
4150                .unwrap()
4151                .db_instance_class
4152                .as_deref(),
4153            Some("db.t3.small"),
4154        );
4155    }
4156
4157    // ── Snapshots (sync ops only) ────────────────────────────────────
4158
4159    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4160        let mut __a = svc.state.write();
4161        let state = __a.default_mut();
4162        let arn = state.db_snapshot_arn(snapshot_id);
4163        state.snapshots.insert(
4164            snapshot_id.to_string(),
4165            crate::state::DbSnapshot {
4166                db_snapshot_identifier: snapshot_id.to_string(),
4167                db_snapshot_arn: arn,
4168                db_instance_identifier: instance_id.to_string(),
4169                snapshot_create_time: Utc::now(),
4170                engine: "postgres".to_string(),
4171                engine_version: "16.3".to_string(),
4172                allocated_storage: 20,
4173                status: "available".to_string(),
4174                port: 5432,
4175                master_username: "admin".to_string(),
4176                db_name: Some("appdb".to_string()),
4177                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4178                snapshot_type: "manual".to_string(),
4179                master_user_password: "secret".to_string(),
4180                tags: Vec::new(),
4181                dump_data: Vec::new(),
4182            },
4183        );
4184    }
4185
4186    #[test]
4187    fn delete_db_snapshot_removes_entry() {
4188        let svc = make_service();
4189        seed_snapshot(&svc, "snap1", "db1");
4190        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4191        svc.delete_db_snapshot(&req).unwrap();
4192        assert!(svc.state.read().default_ref().snapshots.is_empty());
4193    }
4194
4195    #[test]
4196    fn delete_db_snapshot_unknown_errors() {
4197        let svc = make_service();
4198        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4199        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4200    }
4201
4202    #[test]
4203    fn describe_db_snapshots_rejects_both_filters() {
4204        let svc = make_service();
4205        let req = request(
4206            "DescribeDBSnapshots",
4207            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4208        );
4209        assert_code(
4210            svc.describe_db_snapshots(&req),
4211            "InvalidParameterCombination",
4212        );
4213    }
4214
4215    #[test]
4216    fn describe_db_snapshots_by_id_or_instance() {
4217        let svc = make_service();
4218        seed_snapshot(&svc, "snap1", "db1");
4219        seed_snapshot(&svc, "snap2", "db2");
4220
4221        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4222        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4223        assert!(body.contains("snap1"));
4224        assert!(!body.contains("snap2"));
4225
4226        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4227        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4228        assert!(body.contains("snap2"));
4229        assert!(!body.contains("snap1"));
4230
4231        let list_all = request("DescribeDBSnapshots", &[]);
4232        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4233        assert!(body.contains("snap1"));
4234        assert!(body.contains("snap2"));
4235    }
4236
4237    #[test]
4238    fn describe_db_snapshots_unknown_id_errors() {
4239        let svc = make_service();
4240        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4241        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4242    }
4243
4244    // ── Error branch tests ──
4245
4246    #[test]
4247    fn describe_db_instances_not_found() {
4248        let svc = make_service();
4249        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4250        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4251    }
4252
4253    #[tokio::test]
4254    async fn delete_db_instance_not_found() {
4255        let svc = make_service();
4256        let req = request(
4257            "DeleteDBInstance",
4258            &[
4259                ("DBInstanceIdentifier", "ghost"),
4260                ("SkipFinalSnapshot", "true"),
4261            ],
4262        );
4263        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4264    }
4265
4266    #[test]
4267    fn modify_db_instance_not_found() {
4268        let svc = make_service();
4269        let req = request(
4270            "ModifyDBInstance",
4271            &[
4272                ("DBInstanceIdentifier", "ghost"),
4273                ("AllocatedStorage", "20"),
4274            ],
4275        );
4276        // Validation fires before existence check
4277        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4278    }
4279
4280    #[tokio::test]
4281    async fn reboot_db_instance_not_found() {
4282        let svc = make_service();
4283        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4284        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4285    }
4286
4287    #[tokio::test]
4288    async fn create_db_snapshot_instance_not_found() {
4289        let svc = make_service();
4290        let req = request(
4291            "CreateDBSnapshot",
4292            &[
4293                ("DBInstanceIdentifier", "ghost"),
4294                ("DBSnapshotIdentifier", "snap1"),
4295            ],
4296        );
4297        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4298    }
4299
4300    #[tokio::test]
4301    async fn restore_db_instance_snapshot_not_found() {
4302        let svc = make_service();
4303        let req = request(
4304            "RestoreDBInstanceFromDBSnapshot",
4305            &[
4306                ("DBInstanceIdentifier", "restored"),
4307                ("DBSnapshotIdentifier", "ghost-snap"),
4308            ],
4309        );
4310        assert_code(
4311            svc.restore_db_instance_from_db_snapshot(&req).await,
4312            "InvalidParameterValue",
4313        );
4314    }
4315
4316    #[tokio::test]
4317    async fn create_db_instance_read_replica_source_not_found() {
4318        let svc = make_service();
4319        let req = request(
4320            "CreateDBInstanceReadReplica",
4321            &[
4322                ("DBInstanceIdentifier", "replica"),
4323                ("SourceDBInstanceIdentifier", "ghost"),
4324            ],
4325        );
4326        assert_code(
4327            svc.create_db_instance_read_replica(&req).await,
4328            "InvalidParameterValue",
4329        );
4330    }
4331
4332    #[test]
4333    fn describe_db_engine_versions_basic() {
4334        let svc = make_service();
4335        let req = request("DescribeDBEngineVersions", &[]);
4336        let resp = svc.describe_db_engine_versions(&req).unwrap();
4337        let body = body_of(resp);
4338        assert!(body.contains("<DBEngineVersions>"));
4339    }
4340
4341    #[test]
4342    fn describe_orderable_db_instance_options_basic() {
4343        let svc = make_service();
4344        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4345        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4346        let body = body_of(resp);
4347        assert!(body.contains("<OrderableDBInstanceOptions>"));
4348    }
4349
4350    #[test]
4351    fn describe_db_parameter_group_not_found() {
4352        let svc = make_service();
4353        let req = request(
4354            "DescribeDBParameterGroups",
4355            &[("DBParameterGroupName", "ghost")],
4356        );
4357        assert_code(
4358            svc.describe_db_parameter_groups(&req),
4359            "DBParameterGroupNotFound",
4360        );
4361    }
4362
4363    #[test]
4364    fn delete_db_parameter_group_not_found() {
4365        let svc = make_service();
4366        let req = request(
4367            "DeleteDBParameterGroup",
4368            &[("DBParameterGroupName", "ghost")],
4369        );
4370        assert_code(
4371            svc.delete_db_parameter_group(&req),
4372            "DBParameterGroupNotFound",
4373        );
4374    }
4375
4376    #[test]
4377    fn describe_db_subnet_group_not_found() {
4378        let svc = make_service();
4379        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4380        assert_code(
4381            svc.describe_db_subnet_groups(&req),
4382            "DBSubnetGroupNotFoundFault",
4383        );
4384    }
4385
4386    #[test]
4387    fn delete_db_subnet_group_not_found() {
4388        let svc = make_service();
4389        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4390        assert_code(
4391            svc.delete_db_subnet_group(&req),
4392            "DBSubnetGroupNotFoundFault",
4393        );
4394    }
4395
4396    #[test]
4397    fn add_tags_resource_not_found() {
4398        let svc = make_service();
4399        let req = request(
4400            "AddTagsToResource",
4401            &[
4402                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4403                ("Tags.member.1.Key", "k"),
4404                ("Tags.member.1.Value", "v"),
4405            ],
4406        );
4407        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4408    }
4409
4410    #[test]
4411    fn list_tags_resource_not_found() {
4412        let svc = make_service();
4413        let req = request(
4414            "ListTagsForResource",
4415            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4416        );
4417        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4418    }
4419
4420    // ── snapshot operations ──
4421
4422    #[tokio::test]
4423    async fn create_db_snapshot_missing_id_errors() {
4424        let svc = make_service();
4425        let req = request(
4426            "CreateDBSnapshot",
4427            &[("DBInstanceIdentifier", "nonexistent")],
4428        );
4429        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4430    }
4431
4432    #[tokio::test]
4433    async fn create_db_snapshot_unknown_instance_errors() {
4434        let svc = make_service();
4435        let req = request(
4436            "CreateDBSnapshot",
4437            &[
4438                ("DBSnapshotIdentifier", "snap1"),
4439                ("DBInstanceIdentifier", "ghost"),
4440            ],
4441        );
4442        assert!(svc.create_db_snapshot(&req).await.is_err());
4443    }
4444
4445    // ── delete_db_instance ──
4446
4447    #[tokio::test]
4448    async fn delete_db_instance_missing_id_errors() {
4449        let svc = make_service();
4450        let req = request("DeleteDBInstance", &[]);
4451        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4452    }
4453
4454    // ── reboot_db_instance ──
4455
4456    #[tokio::test]
4457    async fn reboot_db_instance_missing_id_errors() {
4458        let svc = make_service();
4459        let req = request("RebootDBInstance", &[]);
4460        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4461    }
4462
4463    // ── create_db_instance validation ──
4464
4465    #[tokio::test]
4466    async fn create_db_instance_missing_id_errors() {
4467        let svc = make_service();
4468        let req = request(
4469            "CreateDBInstance",
4470            &[
4471                ("Engine", "postgres"),
4472                ("DBInstanceClass", "db.t3.micro"),
4473                ("AllocatedStorage", "20"),
4474                ("MasterUsername", "admin"),
4475                ("MasterUserPassword", "secretpass"),
4476            ],
4477        );
4478        assert!(svc.create_db_instance(&req).await.is_err());
4479    }
4480
4481    #[tokio::test]
4482    async fn create_db_instance_unsupported_engine_errors() {
4483        let svc = make_service();
4484        let req = request(
4485            "CreateDBInstance",
4486            &[
4487                ("DBInstanceIdentifier", "db1"),
4488                ("Engine", "mongodb"),
4489                ("DBInstanceClass", "db.t3.micro"),
4490                ("AllocatedStorage", "20"),
4491                ("MasterUsername", "admin"),
4492                ("MasterUserPassword", "secretpass"),
4493            ],
4494        );
4495        assert!(svc.create_db_instance(&req).await.is_err());
4496    }
4497
4498    // ── restore_db_instance_from_db_snapshot ──
4499
4500    #[tokio::test]
4501    async fn restore_db_instance_missing_ids_errors() {
4502        let svc = make_service();
4503        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4504        assert!(svc
4505            .restore_db_instance_from_db_snapshot(&req)
4506            .await
4507            .is_err());
4508    }
4509
4510    #[tokio::test]
4511    async fn restore_db_instance_unknown_snapshot_errors() {
4512        let svc = make_service();
4513        let req = request(
4514            "RestoreDBInstanceFromDBSnapshot",
4515            &[
4516                ("DBInstanceIdentifier", "restored"),
4517                ("DBSnapshotIdentifier", "missing"),
4518            ],
4519        );
4520        assert!(svc
4521            .restore_db_instance_from_db_snapshot(&req)
4522            .await
4523            .is_err());
4524    }
4525
4526    // ── create_db_instance_read_replica ──
4527
4528    #[tokio::test]
4529    async fn create_read_replica_missing_source_errors() {
4530        let svc = make_service();
4531        let req = request(
4532            "CreateDBInstanceReadReplica",
4533            &[("DBInstanceIdentifier", "replica1")],
4534        );
4535        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4536    }
4537
4538    #[tokio::test]
4539    async fn create_read_replica_unknown_source_errors() {
4540        let svc = make_service();
4541        let req = request(
4542            "CreateDBInstanceReadReplica",
4543            &[
4544                ("DBInstanceIdentifier", "replica1"),
4545                ("SourceDBInstanceIdentifier", "ghost"),
4546            ],
4547        );
4548        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4549    }
4550
4551    // ── describe_db_snapshots with filters ──
4552
4553    #[test]
4554    fn describe_db_snapshots_by_snapshot_id_only() {
4555        let svc = make_service();
4556        seed_snapshot(&svc, "s1", "inst1");
4557        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4558        let resp = svc.describe_db_snapshots(&req).unwrap();
4559        let b = body_of(resp);
4560        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4561    }
4562
4563    #[test]
4564    fn describe_db_snapshots_by_instance_id_returns_matching() {
4565        let svc = make_service();
4566        seed_snapshot(&svc, "s1", "inst1");
4567        seed_snapshot(&svc, "s2", "inst2");
4568        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4569        let resp = svc.describe_db_snapshots(&req).unwrap();
4570        let b = body_of(resp);
4571        assert!(b.contains("s1"));
4572        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4573    }
4574
4575    // ── modify_db_parameter_group ──
4576
4577    #[test]
4578    fn modify_db_parameter_group_missing_name() {
4579        let svc = make_service();
4580        let req = request("ModifyDBParameterGroup", &[]);
4581        assert!(svc.modify_db_parameter_group(&req).is_err());
4582    }
4583
4584    // ── modify_db_subnet_group ──
4585
4586    #[test]
4587    fn modify_db_subnet_group_unknown_errors() {
4588        let svc = make_service();
4589        let req = request(
4590            "ModifyDBSubnetGroup",
4591            &[
4592                ("DBSubnetGroupName", "ghost"),
4593                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4594                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4595            ],
4596        );
4597        assert!(svc.modify_db_subnet_group(&req).is_err());
4598    }
4599
4600    // ── describe_db_instances ──
4601
4602    #[test]
4603    fn describe_db_instances_empty_returns_xml() {
4604        let svc = make_service();
4605        let req = request("DescribeDBInstances", &[]);
4606        let resp = svc.describe_db_instances(&req).unwrap();
4607        let b = body_of(resp);
4608        assert!(b.contains("DescribeDBInstancesResult"));
4609    }
4610
4611    #[test]
4612    fn describe_db_snapshots_empty_returns_empty_list() {
4613        let svc = make_service();
4614        let req = request("DescribeDBSnapshots", &[]);
4615        let resp = svc.describe_db_snapshots(&req).unwrap();
4616        let b = body_of(resp);
4617        assert!(b.contains("DescribeDBSnapshotsResult"));
4618    }
4619
4620    #[test]
4621    fn add_tags_unknown_resource_errors() {
4622        let svc = make_service();
4623        let req = request(
4624            "AddTagsToResource",
4625            &[
4626                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4627                ("Tags.member.1.Key", "k"),
4628                ("Tags.member.1.Value", "v"),
4629            ],
4630        );
4631        assert!(svc.add_tags_to_resource(&req).is_err());
4632    }
4633
4634    #[test]
4635    fn remove_tags_unknown_resource_errors() {
4636        let svc = make_service();
4637        let req = request(
4638            "RemoveTagsFromResource",
4639            &[
4640                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4641                ("TagKeys.member.1", "k"),
4642            ],
4643        );
4644        assert!(svc.remove_tags_from_resource(&req).is_err());
4645    }
4646
4647    #[test]
4648    fn create_db_parameter_group_missing_name_errors() {
4649        let svc = make_service();
4650        let req = request(
4651            "CreateDBParameterGroup",
4652            &[
4653                ("DBParameterGroupFamily", "postgres16"),
4654                ("Description", "d"),
4655            ],
4656        );
4657        assert!(svc.create_db_parameter_group(&req).is_err());
4658    }
4659
4660    #[test]
4661    fn create_db_subnet_group_missing_desc_errors() {
4662        let svc = make_service();
4663        let req = request(
4664            "CreateDBSubnetGroup",
4665            &[
4666                ("DBSubnetGroupName", "sg1"),
4667                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4668                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4669            ],
4670        );
4671        assert!(svc.create_db_subnet_group(&req).is_err());
4672    }
4673
4674    #[tokio::test]
4675    async fn create_db_instance_missing_class_errors() {
4676        let svc = make_service();
4677        let req = request(
4678            "CreateDBInstance",
4679            &[
4680                ("DBInstanceIdentifier", "miss-class"),
4681                ("Engine", "postgres"),
4682                ("AllocatedStorage", "20"),
4683                ("MasterUsername", "admin"),
4684                ("MasterUserPassword", "secretpass"),
4685            ],
4686        );
4687        assert!(svc.create_db_instance(&req).await.is_err());
4688    }
4689
4690    #[tokio::test]
4691    async fn create_db_instance_missing_master_username_errors() {
4692        let svc = make_service();
4693        let req = request(
4694            "CreateDBInstance",
4695            &[
4696                ("DBInstanceIdentifier", "miss-mu"),
4697                ("Engine", "postgres"),
4698                ("DBInstanceClass", "db.t3.micro"),
4699                ("AllocatedStorage", "20"),
4700                ("MasterUserPassword", "secretpass"),
4701            ],
4702        );
4703        assert!(svc.create_db_instance(&req).await.is_err());
4704    }
4705
4706    #[test]
4707    fn modify_db_instance_missing_id_errors() {
4708        let svc = make_service();
4709        let req = request("ModifyDBInstance", &[]);
4710        assert!(svc.modify_db_instance(&req).is_err());
4711    }
4712
4713    #[test]
4714    fn modify_db_parameter_group_unknown_pg_errors() {
4715        let svc = make_service();
4716        let req = request(
4717            "ModifyDBParameterGroup",
4718            &[
4719                ("DBParameterGroupName", "ghost"),
4720                ("Parameters.member.1.ParameterName", "p"),
4721                ("Parameters.member.1.ParameterValue", "v"),
4722                ("Parameters.member.1.ApplyMethod", "immediate"),
4723            ],
4724        );
4725        assert!(svc.modify_db_parameter_group(&req).is_err());
4726    }
4727
4728    #[test]
4729    fn describe_db_parameter_groups_unknown_errors() {
4730        let svc = make_service();
4731        let req = request(
4732            "DescribeDBParameterGroups",
4733            &[("DBParameterGroupName", "ghost")],
4734        );
4735        assert!(svc.describe_db_parameter_groups(&req).is_err());
4736    }
4737
4738    #[test]
4739    fn describe_db_subnet_groups_unknown_errors() {
4740        let svc = make_service();
4741        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4742        assert!(svc.describe_db_subnet_groups(&req).is_err());
4743    }
4744}