Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::query_response_xml;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25fn is_mutating_action(action: &str) -> bool {
26    if matches!(
27        action,
28        "AddTagsToResource"
29            | "CreateDBInstance"
30            | "CreateDBInstanceReadReplica"
31            | "CreateDBParameterGroup"
32            | "CreateDBSnapshot"
33            | "CreateDBSubnetGroup"
34            | "DeleteDBInstance"
35            | "DeleteDBParameterGroup"
36            | "DeleteDBSnapshot"
37            | "DeleteDBSubnetGroup"
38            | "ModifyDBInstance"
39            | "ModifyDBParameterGroup"
40            | "ModifyDBSubnetGroup"
41            | "RebootDBInstance"
42            | "RemoveTagsFromResource"
43            | "RestoreDBInstanceFromDBSnapshot"
44    ) {
45        return true;
46    }
47    // Heuristic for the 140 extra ops: any verb that mutates state.
48    let mutating_prefixes = [
49        "Create",
50        "Modify",
51        "Delete",
52        "Reboot",
53        "Start",
54        "Stop",
55        "Failover",
56        "Switchover",
57        "Promote",
58        "Reset",
59        "Apply",
60        "Authorize",
61        "Revoke",
62        "Add",
63        "Remove",
64        "Register",
65        "Deregister",
66        "Copy",
67        "Restore",
68        "Backtrack",
69        "Cancel",
70        "Purchase",
71        "Disable",
72        "Enable",
73    ];
74    mutating_prefixes.iter().any(|p| action.starts_with(p))
75}
76const SUPPORTED_ACTIONS: &[&str] = &[
77    "AddRoleToDBCluster",
78    "AddRoleToDBInstance",
79    "AddSourceIdentifierToSubscription",
80    "AddTagsToResource",
81    "ApplyPendingMaintenanceAction",
82    "AuthorizeDBSecurityGroupIngress",
83    "BacktrackDBCluster",
84    "CancelExportTask",
85    "CopyDBClusterParameterGroup",
86    "CopyDBClusterSnapshot",
87    "CopyDBParameterGroup",
88    "CopyDBSnapshot",
89    "CopyOptionGroup",
90    "CreateBlueGreenDeployment",
91    "CreateCustomDBEngineVersion",
92    "CreateDBCluster",
93    "CreateDBClusterEndpoint",
94    "CreateDBClusterParameterGroup",
95    "CreateDBClusterSnapshot",
96    "CreateDBInstance",
97    "CreateDBInstanceReadReplica",
98    "CreateDBParameterGroup",
99    "CreateDBProxy",
100    "CreateDBProxyEndpoint",
101    "CreateDBSecurityGroup",
102    "CreateDBShardGroup",
103    "CreateDBSnapshot",
104    "CreateDBSubnetGroup",
105    "CreateEventSubscription",
106    "CreateGlobalCluster",
107    "CreateIntegration",
108    "CreateOptionGroup",
109    "CreateTenantDatabase",
110    "DeleteBlueGreenDeployment",
111    "DeleteCustomDBEngineVersion",
112    "DeleteDBCluster",
113    "DeleteDBClusterAutomatedBackup",
114    "DeleteDBClusterEndpoint",
115    "DeleteDBClusterParameterGroup",
116    "DeleteDBClusterSnapshot",
117    "DeleteDBInstance",
118    "DeleteDBInstanceAutomatedBackup",
119    "DeleteDBParameterGroup",
120    "DeleteDBProxy",
121    "DeleteDBProxyEndpoint",
122    "DeleteDBSecurityGroup",
123    "DeleteDBShardGroup",
124    "DeleteDBSnapshot",
125    "DeleteDBSubnetGroup",
126    "DeleteEventSubscription",
127    "DeleteGlobalCluster",
128    "DeleteIntegration",
129    "DeleteOptionGroup",
130    "DeleteTenantDatabase",
131    "DeregisterDBProxyTargets",
132    "DescribeAccountAttributes",
133    "DescribeBlueGreenDeployments",
134    "DescribeCertificates",
135    "DescribeDBClusterAutomatedBackups",
136    "DescribeDBClusterBacktracks",
137    "DescribeDBClusterEndpoints",
138    "DescribeDBClusterParameterGroups",
139    "DescribeDBClusterParameters",
140    "DescribeDBClusterSnapshotAttributes",
141    "DescribeDBClusterSnapshots",
142    "DescribeDBClusters",
143    "DescribeDBEngineVersions",
144    "DescribeDBInstanceAutomatedBackups",
145    "DescribeDBInstances",
146    "DescribeDBLogFiles",
147    "DescribeDBMajorEngineVersions",
148    "DescribeDBParameterGroups",
149    "DescribeDBParameters",
150    "DescribeDBProxies",
151    "DescribeDBProxyEndpoints",
152    "DescribeDBProxyTargetGroups",
153    "DescribeDBProxyTargets",
154    "DescribeDBRecommendations",
155    "DescribeDBSecurityGroups",
156    "DescribeDBShardGroups",
157    "DescribeDBSnapshotAttributes",
158    "DescribeDBSnapshotTenantDatabases",
159    "DescribeDBSnapshots",
160    "DescribeDBSubnetGroups",
161    "DescribeEngineDefaultClusterParameters",
162    "DescribeEngineDefaultParameters",
163    "DescribeEventCategories",
164    "DescribeEventSubscriptions",
165    "DescribeEvents",
166    "DescribeExportTasks",
167    "DescribeGlobalClusters",
168    "DescribeIntegrations",
169    "DescribeOptionGroupOptions",
170    "DescribeOptionGroups",
171    "DescribeOrderableDBInstanceOptions",
172    "DescribePendingMaintenanceActions",
173    "DescribeReservedDBInstances",
174    "DescribeReservedDBInstancesOfferings",
175    "DescribeSourceRegions",
176    "DescribeTenantDatabases",
177    "DescribeValidDBInstanceModifications",
178    "DisableHttpEndpoint",
179    "DownloadDBLogFilePortion",
180    "EnableHttpEndpoint",
181    "FailoverDBCluster",
182    "FailoverGlobalCluster",
183    "ListTagsForResource",
184    "ModifyActivityStream",
185    "ModifyCertificates",
186    "ModifyCurrentDBClusterCapacity",
187    "ModifyCustomDBEngineVersion",
188    "ModifyDBCluster",
189    "ModifyDBClusterEndpoint",
190    "ModifyDBClusterParameterGroup",
191    "ModifyDBClusterSnapshotAttribute",
192    "ModifyDBInstance",
193    "ModifyDBParameterGroup",
194    "ModifyDBProxy",
195    "ModifyDBProxyEndpoint",
196    "ModifyDBProxyTargetGroup",
197    "ModifyDBRecommendation",
198    "ModifyDBShardGroup",
199    "ModifyDBSnapshot",
200    "ModifyDBSnapshotAttribute",
201    "ModifyDBSubnetGroup",
202    "ModifyEventSubscription",
203    "ModifyGlobalCluster",
204    "ModifyIntegration",
205    "ModifyOptionGroup",
206    "ModifyTenantDatabase",
207    "PromoteReadReplica",
208    "PromoteReadReplicaDBCluster",
209    "PurchaseReservedDBInstancesOffering",
210    "RebootDBCluster",
211    "RebootDBInstance",
212    "RebootDBShardGroup",
213    "RegisterDBProxyTargets",
214    "RemoveFromGlobalCluster",
215    "RemoveRoleFromDBCluster",
216    "RemoveRoleFromDBInstance",
217    "RemoveSourceIdentifierFromSubscription",
218    "RemoveTagsFromResource",
219    "ResetDBClusterParameterGroup",
220    "ResetDBParameterGroup",
221    "RestoreDBClusterFromS3",
222    "RestoreDBClusterFromSnapshot",
223    "RestoreDBClusterToPointInTime",
224    "RestoreDBInstanceFromDBSnapshot",
225    "RestoreDBInstanceFromS3",
226    "RestoreDBInstanceToPointInTime",
227    "RevokeDBSecurityGroupIngress",
228    "StartActivityStream",
229    "StartDBCluster",
230    "StartDBInstance",
231    "StartDBInstanceAutomatedBackupsReplication",
232    "StartExportTask",
233    "StopActivityStream",
234    "StopDBCluster",
235    "StopDBInstance",
236    "StopDBInstanceAutomatedBackupsReplication",
237    "SwitchoverBlueGreenDeployment",
238    "SwitchoverGlobalCluster",
239    "SwitchoverReadReplica",
240];
241
242pub struct RdsService {
243    pub(crate) state: SharedRdsState,
244    runtime: Option<Arc<RdsRuntime>>,
245    snapshot_store: Option<Arc<dyn SnapshotStore>>,
246    snapshot_lock: Arc<AsyncMutex<()>>,
247    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
248}
249
250/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
251#[derive(Clone, Copy)]
252#[allow(dead_code, clippy::enum_variant_names)]
253pub(crate) enum RdsSourceType {
254    DbInstance,
255    DbSnapshot,
256    DbParameterGroup,
257}
258
259impl RdsSourceType {
260    fn as_str(self) -> &'static str {
261        match self {
262            Self::DbInstance => "DB_INSTANCE",
263            Self::DbSnapshot => "DB_SNAPSHOT",
264            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
265        }
266    }
267
268    fn detail_type(self) -> &'static str {
269        match self {
270            Self::DbInstance => "RDS DB Instance Event",
271            Self::DbSnapshot => "RDS DB Snapshot Event",
272            Self::DbParameterGroup => "RDS DB Parameter Group Event",
273        }
274    }
275}
276
277impl RdsService {
278    pub(crate) fn state_handle(&self) -> &SharedRdsState {
279        &self.state
280    }
281}
282
283impl RdsService {
284    pub fn new(state: SharedRdsState) -> Self {
285        Self {
286            state,
287            runtime: None,
288            snapshot_store: None,
289            snapshot_lock: Arc::new(AsyncMutex::new(())),
290            delivery_bus: None,
291        }
292    }
293
294    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
295        self.runtime = Some(runtime);
296        self
297    }
298
299    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
300        self.snapshot_store = Some(store);
301        self
302    }
303
304    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
305        self.delivery_bus = Some(bus);
306        self
307    }
308
309    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
310    /// No-op when the delivery bus isn't wired (tests, minimal configs).
311    pub(crate) fn emit_event(
312        &self,
313        source_type: RdsSourceType,
314        source_identifier: &str,
315        source_arn: &str,
316        event_id: &str,
317        event_categories: &[&str],
318        message: &str,
319    ) {
320        emit_event_static(
321            self.delivery_bus.as_ref(),
322            source_type,
323            source_identifier,
324            source_arn,
325            event_id,
326            event_categories,
327            message,
328        );
329    }
330
331    async fn save_snapshot(&self) {
332        let Some(store) = self.snapshot_store.clone() else {
333            return;
334        };
335        let _guard = self.snapshot_lock.lock().await;
336        let snapshot = RdsSnapshot {
337            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
338            state: None,
339            accounts: Some(self.state.read().clone()),
340        };
341        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
342            let bytes = serde_json::to_vec(&snapshot)
343                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
344            store.save(&bytes)
345        })
346        .await;
347        match join {
348            Ok(Ok(())) => {}
349            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
350            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
351        }
352    }
353
354    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
355    ///
356    /// RDS operations that start, stop, or reach into a database container fail
357    /// with a consistent wire error when the daemon (Docker/Podman) is missing
358    /// rather than each caller restating the message.
359    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
360        self.runtime.as_ref().ok_or_else(|| {
361            AwsServiceError::aws_error(
362                StatusCode::SERVICE_UNAVAILABLE,
363                "InvalidParameterValue",
364                "Docker/Podman is required for RDS DB instances but is not available",
365            )
366        })
367    }
368}
369
370#[async_trait]
371impl AwsService for RdsService {
372    fn service_name(&self) -> &str {
373        "rds"
374    }
375
376    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
377        let mutates = is_mutating_action(request.action.as_str());
378        let result = match request.action.as_str() {
379            "AddTagsToResource" => self.add_tags_to_resource(&request),
380            "CreateDBInstance" => self.create_db_instance(&request).await,
381            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
382            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
383            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
384            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
385            "DeleteDBInstance" => self.delete_db_instance(&request).await,
386            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
387            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
388            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
389            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
390            "DescribeDBInstances" => self.describe_db_instances(&request),
391            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
392            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
393            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
394            "DescribeOrderableDBInstanceOptions" => {
395                self.describe_orderable_db_instance_options(&request)
396            }
397            "ListTagsForResource" => self.list_tags_for_resource(&request),
398            "ModifyDBInstance" => self.modify_db_instance(&request),
399            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
400            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
401            "RebootDBInstance" => self.reboot_db_instance(&request).await,
402            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
403            "RestoreDBInstanceFromDBSnapshot" => {
404                self.restore_db_instance_from_db_snapshot(&request).await
405            }
406            _ => self.handle_extra_action(&request),
407        };
408        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
409            self.save_snapshot().await;
410        }
411        result
412    }
413
414    fn supported_actions(&self) -> &[&str] {
415        SUPPORTED_ACTIONS
416    }
417}
418
419impl RdsService {
420    async fn create_db_instance(
421        &self,
422        request: &AwsRequest,
423    ) -> Result<AwsResponse, AwsServiceError> {
424        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
425        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
426        let db_instance_class = required_param(request, "DBInstanceClass")?;
427        let engine = required_param(request, "Engine")?;
428        let master_username = required_param(request, "MasterUsername")?;
429        let master_user_password = required_param(request, "MasterUserPassword")?;
430        let db_name = optional_param(request, "DBName");
431        let engine_version =
432            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
433        let publicly_accessible =
434            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
435                .unwrap_or(true);
436        let deletion_protection =
437            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
438                .unwrap_or(false);
439        let port = optional_i32_param(request, "Port")?
440            .unwrap_or_else(|| default_port_for_engine(&engine));
441        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
442
443        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
444            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
445
446        let backup_retention_period =
447            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
448        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
449            .unwrap_or_else(|| "03:00-04:00".to_string());
450        let option_group_name = optional_param(request, "OptionGroupName");
451        let multi_az =
452            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
453
454        validate_create_request(
455            &db_instance_identifier,
456            allocated_storage,
457            &db_instance_class,
458            &engine,
459            &engine_version,
460            port,
461        )?;
462
463        {
464            let mut accounts = self.state.write();
465            let state = accounts.get_or_create(&request.account_id);
466            if !state.begin_instance_creation(&db_instance_identifier) {
467                return Err(AwsServiceError::aws_error(
468                    StatusCode::BAD_REQUEST,
469                    "DBInstanceAlreadyExists",
470                    format!("DBInstance {} already exists.", db_instance_identifier),
471                ));
472            }
473            // Validate parameter group exists if specified by the caller
474            if let Some(ref pg_name) = db_parameter_group_name {
475                if !state.parameter_groups.contains_key(pg_name) {
476                    state.cancel_instance_creation(&db_instance_identifier);
477                    return Err(AwsServiceError::aws_error(
478                        StatusCode::NOT_FOUND,
479                        "DBParameterGroupNotFound",
480                        format!("DBParameterGroup {} not found.", pg_name),
481                    ));
482                }
483            }
484        }
485
486        let runtime = self.require_runtime()?.clone();
487
488        let logical_db_name = db_name
489            .clone()
490            .unwrap_or_else(|| default_db_name(&engine).to_string());
491
492        // Insert a "creating" placeholder synchronously and spawn the
493        // container start in a background task. CreateDBInstance returns
494        // ~immediately; DescribeDBInstances flips to "available" (or
495        // "failed") when the container is up. Matches AWS RDS behavior:
496        // CreateDBInstance never blocks on the container coming up.
497        let created_at = Utc::now();
498        let instance = {
499            let mut accounts = self.state.write();
500            let state = accounts.get_or_create(&request.account_id);
501            let placeholder = DbInstance {
502                db_instance_identifier: db_instance_identifier.clone(),
503                db_instance_arn: state.db_instance_arn(&db_instance_identifier),
504                db_instance_class: db_instance_class.clone(),
505                engine: engine.clone(),
506                engine_version: engine_version.clone(),
507                db_instance_status: "creating".to_string(),
508                master_username: master_username.clone(),
509                db_name: db_name.clone(),
510                endpoint_address: String::new(),
511                port: 0,
512                allocated_storage,
513                publicly_accessible,
514                deletion_protection,
515                created_at,
516                dbi_resource_id: state.next_dbi_resource_id(),
517                master_user_password: master_user_password.clone(),
518                container_id: String::new(),
519                host_port: 0,
520                tags: Vec::new(),
521                read_replica_source_db_instance_identifier: None,
522                read_replica_db_instance_identifiers: Vec::new(),
523                vpc_security_group_ids,
524                db_parameter_group_name,
525                backup_retention_period,
526                preferred_backup_window,
527                latest_restorable_time: if backup_retention_period > 0 {
528                    Some(created_at)
529                } else {
530                    None
531                },
532                option_group_name,
533                multi_az,
534                pending_modified_values: None,
535            };
536            state.finish_instance_creation(placeholder.clone());
537            placeholder
538        };
539        let instance_arn = instance.db_instance_arn.clone();
540
541        self.emit_event(
542            RdsSourceType::DbInstance,
543            &db_instance_identifier,
544            &instance_arn,
545            "RDS-EVENT-0005",
546            &["creation"],
547            "DB instance created",
548        );
549
550        {
551            let state_handle = self.state.clone();
552            let delivery_bus = self.delivery_bus.clone();
553            let runtime = runtime.clone();
554            let id = db_instance_identifier.clone();
555            let engine = engine.clone();
556            let engine_version = engine_version.clone();
557            let master_username = master_username.clone();
558            let master_user_password = master_user_password.clone();
559            let account_id = request.account_id.clone();
560            let region = request.region.clone();
561            let arn = instance_arn.clone();
562            tokio::spawn(async move {
563                match runtime
564                    .ensure_postgres(
565                        &id,
566                        &engine,
567                        &engine_version,
568                        &master_username,
569                        &master_user_password,
570                        &logical_db_name,
571                        &account_id,
572                        &region,
573                    )
574                    .await
575                {
576                    Ok(running) => {
577                        let mut accounts = state_handle.write();
578                        let state = accounts.get_or_create(&account_id);
579                        if let Some(inst) = state.instances.get_mut(&id) {
580                            inst.db_instance_status = "available".to_string();
581                            inst.endpoint_address = "127.0.0.1".to_string();
582                            inst.port = i32::from(running.host_port);
583                            inst.host_port = running.host_port;
584                            inst.container_id = running.container_id;
585                        }
586                    }
587                    Err(error) => {
588                        tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
589                        {
590                            let mut accounts = state_handle.write();
591                            let state = accounts.get_or_create(&account_id);
592                            state.instances.remove(&id);
593                        }
594                        emit_event_static(
595                            delivery_bus.as_ref(),
596                            RdsSourceType::DbInstance,
597                            &id,
598                            &arn,
599                            "RDS-EVENT-0058",
600                            &["failure"],
601                            &format!("DB instance failed to create: {}", error),
602                        );
603                    }
604                }
605            });
606        }
607
608        Ok(AwsResponse::xml(
609            StatusCode::OK,
610            query_response_xml(
611                "CreateDBInstance",
612                RDS_NS,
613                &format!(
614                    "<DBInstance>{}</DBInstance>",
615                    db_instance_xml(&instance, None)
616                ),
617                &request.request_id,
618            ),
619        ))
620    }
621
622    async fn delete_db_instance(
623        &self,
624        request: &AwsRequest,
625    ) -> Result<AwsResponse, AwsServiceError> {
626        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
627        let skip_final_snapshot =
628            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
629                .unwrap_or(false);
630        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
631
632        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
633            return Err(AwsServiceError::aws_error(
634                StatusCode::BAD_REQUEST,
635                "InvalidParameterCombination",
636                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
637            ));
638        }
639        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
640            return Err(AwsServiceError::aws_error(
641                StatusCode::BAD_REQUEST,
642                "InvalidParameterCombination",
643                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
644            ));
645        }
646
647        // Check deletion protection BEFORE creating snapshot or making any changes
648        {
649            let accounts = self.state.read();
650            let empty = RdsState::new(&request.account_id, &request.region);
651            let state = accounts.get(&request.account_id).unwrap_or(&empty);
652            if let Some(instance) = state.instances.get(&db_instance_identifier) {
653                if instance.deletion_protection {
654                    return Err(AwsServiceError::aws_error(
655                        StatusCode::BAD_REQUEST,
656                        "InvalidDBInstanceState",
657                        format!(
658                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
659                            db_instance_identifier
660                        ),
661                    ));
662                }
663            } else {
664                return Err(db_instance_not_found(&db_instance_identifier));
665            }
666        }
667
668        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
669            self.create_final_db_snapshot(
670                &db_instance_identifier,
671                snapshot_id,
672                &request.account_id,
673                &request.region,
674            )
675            .await?;
676        }
677
678        let instance = {
679            let mut accounts = self.state.write();
680            let state = accounts.get_or_create(&request.account_id);
681            let instance = state
682                .instances
683                .remove(&db_instance_identifier)
684                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
685
686            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
687                if let Some(source) = state.instances.get_mut(source_id) {
688                    source
689                        .read_replica_db_instance_identifiers
690                        .retain(|id| id != &db_instance_identifier);
691                }
692            }
693
694            for replica_id in &instance.read_replica_db_instance_identifiers {
695                if let Some(replica) = state.instances.get_mut(replica_id) {
696                    replica.read_replica_source_db_instance_identifier = None;
697                }
698            }
699
700            instance
701        };
702
703        if let Some(runtime) = &self.runtime {
704            runtime.stop_container(&db_instance_identifier).await;
705        }
706
707        self.emit_event(
708            RdsSourceType::DbInstance,
709            &db_instance_identifier,
710            &instance.db_instance_arn,
711            "RDS-EVENT-0003",
712            &["deletion"],
713            "DB instance deleted",
714        );
715
716        Ok(AwsResponse::xml(
717            StatusCode::OK,
718            query_response_xml(
719                "DeleteDBInstance",
720                RDS_NS,
721                &format!(
722                    "<DBInstance>{}</DBInstance>",
723                    db_instance_xml(&instance, Some("deleting"))
724                ),
725                &request.request_id,
726            ),
727        ))
728    }
729
730    /// Take a final snapshot of an instance that is about to be deleted,
731    /// persisting the dumped database into `state.snapshots`. The DLQ-style
732    /// conflict check runs twice — once under the read lock before paying
733    /// for the dump, once under the write lock before committing — to keep
734    /// concurrent deletes from colliding.
735    async fn create_final_db_snapshot(
736        &self,
737        db_instance_identifier: &str,
738        snapshot_id: &str,
739        account_id: &str,
740        region: &str,
741    ) -> Result<(), AwsServiceError> {
742        let runtime = self.runtime.as_ref().ok_or_else(|| {
743            AwsServiceError::aws_error(
744                StatusCode::SERVICE_UNAVAILABLE,
745                "InvalidParameterValue",
746                "Docker/Podman is required for RDS snapshots but is not available",
747            )
748        })?;
749
750        let (instance_for_snapshot, db_name) = {
751            let accounts = self.state.read();
752            let empty = RdsState::new(account_id, region);
753            let state = accounts.get(account_id).unwrap_or(&empty);
754
755            if state.snapshots.contains_key(snapshot_id) {
756                return Err(AwsServiceError::aws_error(
757                    StatusCode::CONFLICT,
758                    "DBSnapshotAlreadyExists",
759                    format!("DBSnapshot {snapshot_id} already exists."),
760                ));
761            }
762
763            let instance = state
764                .instances
765                .get(db_instance_identifier)
766                .cloned()
767                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
768
769            let default_db = default_db_name(&instance.engine);
770            let db_name = instance
771                .db_name
772                .as_deref()
773                .unwrap_or(default_db)
774                .to_string();
775
776            (instance, db_name)
777        };
778
779        let dump_data = runtime
780            .dump_database(
781                db_instance_identifier,
782                &instance_for_snapshot.engine,
783                &instance_for_snapshot.master_username,
784                &instance_for_snapshot.master_user_password,
785                &db_name,
786            )
787            .await
788            .map_err(runtime_error_to_service_error)?;
789
790        let mut accounts = self.state.write();
791        let state = accounts.get_or_create(account_id);
792
793        if state.snapshots.contains_key(snapshot_id) {
794            return Err(AwsServiceError::aws_error(
795                StatusCode::CONFLICT,
796                "DBSnapshotAlreadyExists",
797                format!("DBSnapshot {snapshot_id} already exists."),
798            ));
799        }
800
801        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
802
803        let snapshot = DbSnapshot {
804            db_snapshot_identifier: snapshot_id.to_string(),
805            db_snapshot_arn: snapshot_arn,
806            db_instance_identifier: db_instance_identifier.to_string(),
807            snapshot_create_time: Utc::now(),
808            engine: instance_for_snapshot.engine.clone(),
809            engine_version: instance_for_snapshot.engine_version.clone(),
810            allocated_storage: instance_for_snapshot.allocated_storage,
811            status: "available".to_string(),
812            port: instance_for_snapshot.port,
813            master_username: instance_for_snapshot.master_username.clone(),
814            db_name: instance_for_snapshot.db_name.clone(),
815            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
816            snapshot_type: "manual".to_string(),
817            master_user_password: instance_for_snapshot.master_user_password.clone(),
818            tags: Vec::new(),
819            dump_data,
820        };
821
822        state.snapshots.insert(snapshot_id.to_string(), snapshot);
823        Ok(())
824    }
825
826    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
828        let db_instance_class = optional_param(request, "DBInstanceClass");
829        let deletion_protection =
830            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
831        let apply_immediately =
832            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
833
834        // Parse VPC security group IDs - only if at least one is provided
835        let vpc_security_group_ids = {
836            let mut ids = Vec::new();
837            for index in 1.. {
838                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
839                match optional_param(request, &sg_id_name) {
840                    Some(sg_id) => ids.push(sg_id),
841                    None => break,
842                }
843            }
844            if ids.is_empty() {
845                None
846            } else {
847                Some(ids)
848            }
849        };
850
851        if db_instance_class.is_none()
852            && deletion_protection.is_none()
853            && vpc_security_group_ids.is_none()
854        {
855            return Err(AwsServiceError::aws_error(
856                StatusCode::BAD_REQUEST,
857                "InvalidParameterCombination",
858                "At least one supported mutable field must be provided.",
859            ));
860        }
861        if let Some(ref class) = db_instance_class {
862            validate_db_instance_class(class)?;
863        }
864
865        let mut accounts = self.state.write();
866        let state = accounts.get_or_create(&request.account_id);
867        let instance = state
868            .instances
869            .get_mut(&db_instance_identifier)
870            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
871
872        // If ApplyImmediately is false, stage changes as pending
873        if apply_immediately == Some(false) {
874            let pending = instance
875                .pending_modified_values
876                .get_or_insert(Default::default());
877            if let Some(class) = db_instance_class {
878                pending.db_instance_class = Some(class);
879            }
880            // Note: deletion_protection and vpc_security_group_ids are applied immediately
881            // regardless of ApplyImmediately flag (per AWS behavior)
882            if let Some(deletion_protection) = deletion_protection {
883                instance.deletion_protection = deletion_protection;
884            }
885            if let Some(security_group_ids) = vpc_security_group_ids {
886                instance.vpc_security_group_ids = security_group_ids;
887            }
888        } else {
889            // Apply immediately (default behavior)
890            if let Some(class) = db_instance_class {
891                instance.db_instance_class = class;
892            }
893            if let Some(deletion_protection) = deletion_protection {
894                instance.deletion_protection = deletion_protection;
895            }
896            if let Some(security_group_ids) = vpc_security_group_ids {
897                instance.vpc_security_group_ids = security_group_ids;
898            }
899        }
900        let instance_arn = instance.db_instance_arn.clone();
901        let xml = query_response_xml(
902            "ModifyDBInstance",
903            RDS_NS,
904            &format!(
905                "<DBInstance>{}</DBInstance>",
906                db_instance_xml(instance, Some("modifying"))
907            ),
908            &request.request_id,
909        );
910        drop(accounts);
911
912        self.emit_event(
913            RdsSourceType::DbInstance,
914            &db_instance_identifier,
915            &instance_arn,
916            "RDS-EVENT-0014",
917            &["configuration change"],
918            "DB instance was modified",
919        );
920
921        Ok(AwsResponse::xml(StatusCode::OK, xml))
922    }
923
924    async fn reboot_db_instance(
925        &self,
926        request: &AwsRequest,
927    ) -> Result<AwsResponse, AwsServiceError> {
928        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
929        let force_failover =
930            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
931        if force_failover == Some(true) {
932            return Err(AwsServiceError::aws_error(
933                StatusCode::BAD_REQUEST,
934                "InvalidParameterCombination",
935                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
936            ));
937        }
938
939        let instance = {
940            let accounts = self.state.read();
941            let empty = RdsState::new(&request.account_id, &request.region);
942            let state = accounts.get(&request.account_id).unwrap_or(&empty);
943            state
944                .instances
945                .get(&db_instance_identifier)
946                .cloned()
947                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
948        };
949
950        let runtime = self.require_runtime()?;
951
952        let running = runtime
953            .restart_container(
954                &db_instance_identifier,
955                &instance.engine,
956                &instance.master_username,
957                &instance.master_user_password,
958                instance
959                    .db_name
960                    .as_deref()
961                    .unwrap_or(default_db_name(&instance.engine)),
962            )
963            .await
964            .map_err(runtime_error_to_service_error)?;
965
966        let instance = {
967            let mut accounts = self.state.write();
968            let state = accounts.get_or_create(&request.account_id);
969            let instance = state
970                .instances
971                .get_mut(&db_instance_identifier)
972                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
973            instance.host_port = running.host_port;
974            instance.port = i32::from(running.host_port);
975
976            // Apply any pending modifications
977            if let Some(pending) = instance.pending_modified_values.take() {
978                if let Some(class) = pending.db_instance_class {
979                    instance.db_instance_class = class;
980                }
981                if let Some(allocated_storage) = pending.allocated_storage {
982                    instance.allocated_storage = allocated_storage;
983                }
984                if let Some(backup_retention_period) = pending.backup_retention_period {
985                    instance.backup_retention_period = backup_retention_period;
986                }
987                if let Some(multi_az) = pending.multi_az {
988                    instance.multi_az = multi_az;
989                }
990                if let Some(engine_version) = pending.engine_version {
991                    instance.engine_version = engine_version;
992                }
993                if let Some(master_user_password) = pending.master_user_password {
994                    instance.master_user_password = master_user_password;
995                }
996            }
997
998            instance.clone()
999        };
1000
1001        self.emit_event(
1002            RdsSourceType::DbInstance,
1003            &db_instance_identifier,
1004            &instance.db_instance_arn,
1005            "RDS-EVENT-0006",
1006            &["availability"],
1007            "DB instance restarted",
1008        );
1009
1010        Ok(AwsResponse::xml(
1011            StatusCode::OK,
1012            query_response_xml(
1013                "RebootDBInstance",
1014                RDS_NS,
1015                &format!(
1016                    "<DBInstance>{}</DBInstance>",
1017                    db_instance_xml(&instance, Some("rebooting"))
1018                ),
1019                &request.request_id,
1020            ),
1021        ))
1022    }
1023
1024    fn describe_db_engine_versions(
1025        &self,
1026        request: &AwsRequest,
1027    ) -> Result<AwsResponse, AwsServiceError> {
1028        let engine = optional_param(request, "Engine");
1029        let engine_version = optional_param(request, "EngineVersion");
1030        let family = optional_param(request, "DBParameterGroupFamily");
1031        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
1032
1033        let mut versions = filter_engine_versions(
1034            &default_engine_versions(),
1035            &engine,
1036            &engine_version,
1037            &family,
1038        );
1039
1040        if default_only.unwrap_or(false) {
1041            versions.truncate(1);
1042        }
1043
1044        Ok(AwsResponse::xml(
1045            StatusCode::OK,
1046            query_response_xml(
1047                "DescribeDBEngineVersions",
1048                RDS_NS,
1049                &format!(
1050                    "<DBEngineVersions>{}</DBEngineVersions>",
1051                    versions.iter().map(engine_version_xml).collect::<String>()
1052                ),
1053                &request.request_id,
1054            ),
1055        ))
1056    }
1057
1058    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1059        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1060        let marker = optional_param(request, "Marker");
1061        let max_records = optional_param(request, "MaxRecords");
1062
1063        let accounts = self.state.read();
1064        let empty = RdsState::new(&request.account_id, &request.region);
1065        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1066
1067        // If specific identifier requested, return just that one (no pagination)
1068        if let Some(identifier) = db_instance_identifier {
1069            let instance = state
1070                .instances
1071                .get(&identifier)
1072                .cloned()
1073                .ok_or_else(|| db_instance_not_found(&identifier))?;
1074
1075            return Ok(AwsResponse::xml(
1076                StatusCode::OK,
1077                query_response_xml(
1078                    "DescribeDBInstances",
1079                    RDS_NS,
1080                    &format!(
1081                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1082                        db_instance_xml(&instance, None)
1083                    ),
1084                    &request.request_id,
1085                ),
1086            ));
1087        }
1088
1089        // Get all instances sorted by created_at, then identifier
1090        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1091        instances.sort_by(|a, b| {
1092            a.created_at
1093                .cmp(&b.created_at)
1094                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1095        });
1096
1097        // Apply pagination
1098        let paginated = paginate(instances, marker, max_records, |inst| {
1099            &inst.db_instance_identifier
1100        })?;
1101
1102        let marker_xml = paginated
1103            .next_marker
1104            .as_ref()
1105            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1106            .unwrap_or_default();
1107
1108        Ok(AwsResponse::xml(
1109            StatusCode::OK,
1110            query_response_xml(
1111                "DescribeDBInstances",
1112                RDS_NS,
1113                &format!(
1114                    "<DBInstances>{}</DBInstances>{}",
1115                    paginated
1116                        .items
1117                        .iter()
1118                        .map(|instance| {
1119                            format!(
1120                                "<DBInstance>{}</DBInstance>",
1121                                db_instance_xml(instance, None)
1122                            )
1123                        })
1124                        .collect::<String>(),
1125                    marker_xml
1126                ),
1127                &request.request_id,
1128            ),
1129        ))
1130    }
1131
1132    fn describe_orderable_db_instance_options(
1133        &self,
1134        request: &AwsRequest,
1135    ) -> Result<AwsResponse, AwsServiceError> {
1136        let engine = optional_param(request, "Engine");
1137        let engine_version = optional_param(request, "EngineVersion");
1138        let db_instance_class = optional_param(request, "DBInstanceClass");
1139        let license_model = optional_param(request, "LicenseModel");
1140        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1141
1142        let options = filter_orderable_options(
1143            &default_orderable_options(),
1144            &engine,
1145            &engine_version,
1146            &db_instance_class,
1147            &license_model,
1148            vpc,
1149        );
1150
1151        Ok(AwsResponse::xml(
1152            StatusCode::OK,
1153            query_response_xml(
1154                "DescribeOrderableDBInstanceOptions",
1155                RDS_NS,
1156                &format!(
1157                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1158                    options.iter().map(orderable_option_xml).collect::<String>()
1159                ),
1160                &request.request_id,
1161            ),
1162        ))
1163    }
1164
1165    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166        let resource_name = required_param(request, "ResourceName")?;
1167        let tags = parse_tags(request)?;
1168
1169        if tags.is_empty() {
1170            return Err(AwsServiceError::aws_error(
1171                StatusCode::BAD_REQUEST,
1172                "MissingParameter",
1173                "The request must contain the parameter Tags.",
1174            ));
1175        }
1176
1177        let mut accounts = self.state.write();
1178        let state = accounts.get_or_create(&request.account_id);
1179        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1180        merge_tags(&mut instance.tags, &tags);
1181
1182        Ok(AwsResponse::xml(
1183            StatusCode::OK,
1184            query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
1185        ))
1186    }
1187
1188    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1189        let resource_name = required_param(request, "ResourceName")?;
1190        if query_param_prefix_exists(request, "Filters.") {
1191            return Err(AwsServiceError::aws_error(
1192                StatusCode::BAD_REQUEST,
1193                "InvalidParameterValue",
1194                "Filters are not yet supported for ListTagsForResource.",
1195            ));
1196        }
1197
1198        let accounts = self.state.read();
1199        let empty = RdsState::new(&request.account_id, &request.region);
1200        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1201        let instance = find_instance_by_arn(state, &resource_name)?;
1202        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1203
1204        Ok(AwsResponse::xml(
1205            StatusCode::OK,
1206            query_response_xml(
1207                "ListTagsForResource",
1208                RDS_NS,
1209                &format!("<TagList>{tag_xml}</TagList>"),
1210                &request.request_id,
1211            ),
1212        ))
1213    }
1214
1215    fn remove_tags_from_resource(
1216        &self,
1217        request: &AwsRequest,
1218    ) -> Result<AwsResponse, AwsServiceError> {
1219        let resource_name = required_param(request, "ResourceName")?;
1220        let tag_keys = parse_tag_keys(request)?;
1221
1222        if tag_keys.is_empty() {
1223            return Err(AwsServiceError::aws_error(
1224                StatusCode::BAD_REQUEST,
1225                "MissingParameter",
1226                "The request must contain the parameter TagKeys.",
1227            ));
1228        }
1229
1230        let mut accounts = self.state.write();
1231        let state = accounts.get_or_create(&request.account_id);
1232        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1233        instance
1234            .tags
1235            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1236
1237        Ok(AwsResponse::xml(
1238            StatusCode::OK,
1239            query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
1240        ))
1241    }
1242
1243    async fn create_db_snapshot(
1244        &self,
1245        request: &AwsRequest,
1246    ) -> Result<AwsResponse, AwsServiceError> {
1247        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1248        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1249
1250        let runtime = self.runtime.as_ref().ok_or_else(|| {
1251            AwsServiceError::aws_error(
1252                StatusCode::SERVICE_UNAVAILABLE,
1253                "InvalidParameterValue",
1254                "Docker/Podman is required for RDS snapshots but is not available",
1255            )
1256        })?;
1257
1258        let (instance, db_name) = {
1259            let accounts = self.state.read();
1260            let empty = RdsState::new(&request.account_id, &request.region);
1261            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1262
1263            if state.snapshots.contains_key(&db_snapshot_identifier) {
1264                return Err(AwsServiceError::aws_error(
1265                    StatusCode::CONFLICT,
1266                    "DBSnapshotAlreadyExists",
1267                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1268                ));
1269            }
1270
1271            let instance = state
1272                .instances
1273                .get(&db_instance_identifier)
1274                .cloned()
1275                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1276
1277            let default_db = default_db_name(&instance.engine);
1278            let db_name = instance
1279                .db_name
1280                .as_deref()
1281                .unwrap_or(default_db)
1282                .to_string();
1283
1284            (instance, db_name)
1285        };
1286
1287        let dump_data = runtime
1288            .dump_database(
1289                &db_instance_identifier,
1290                &instance.engine,
1291                &instance.master_username,
1292                &instance.master_user_password,
1293                &db_name,
1294            )
1295            .await
1296            .map_err(runtime_error_to_service_error)?;
1297
1298        let mut accounts = self.state.write();
1299        let state = accounts.get_or_create(&request.account_id);
1300
1301        if state.snapshots.contains_key(&db_snapshot_identifier) {
1302            return Err(AwsServiceError::aws_error(
1303                StatusCode::CONFLICT,
1304                "DBSnapshotAlreadyExists",
1305                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1306            ));
1307        }
1308
1309        let snapshot = DbSnapshot {
1310            db_snapshot_identifier: db_snapshot_identifier.clone(),
1311            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1312            db_instance_identifier: instance.db_instance_identifier.clone(),
1313            snapshot_create_time: Utc::now(),
1314            engine: instance.engine.clone(),
1315            engine_version: instance.engine_version.clone(),
1316            allocated_storage: instance.allocated_storage,
1317            status: "available".to_string(),
1318            port: instance.port,
1319            master_username: instance.master_username.clone(),
1320            db_name: instance.db_name.clone(),
1321            dbi_resource_id: instance.dbi_resource_id.clone(),
1322            snapshot_type: "manual".to_string(),
1323            master_user_password: instance.master_user_password.clone(),
1324            tags: Vec::new(),
1325            dump_data,
1326        };
1327
1328        state
1329            .snapshots
1330            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1331        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1332        drop(accounts);
1333
1334        self.emit_event(
1335            RdsSourceType::DbSnapshot,
1336            &db_snapshot_identifier,
1337            &snapshot_arn,
1338            "RDS-EVENT-0042",
1339            &["creation"],
1340            "Manual snapshot created",
1341        );
1342
1343        Ok(AwsResponse::xml(
1344            StatusCode::OK,
1345            query_response_xml(
1346                "CreateDBSnapshot",
1347                RDS_NS,
1348                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1349                &request.request_id,
1350            ),
1351        ))
1352    }
1353
1354    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1356        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1357        let marker = optional_param(request, "Marker");
1358        let max_records = optional_param(request, "MaxRecords");
1359
1360        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1361            return Err(AwsServiceError::aws_error(
1362                StatusCode::BAD_REQUEST,
1363                "InvalidParameterCombination",
1364                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1365            ));
1366        }
1367
1368        let accounts = self.state.read();
1369        let empty = RdsState::new(&request.account_id, &request.region);
1370        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1371
1372        // If specific snapshot requested, return just that one (no pagination)
1373        if let Some(snapshot_id) = db_snapshot_identifier {
1374            let snapshot = state
1375                .snapshots
1376                .get(&snapshot_id)
1377                .cloned()
1378                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1379
1380            return Ok(AwsResponse::xml(
1381                StatusCode::OK,
1382                query_response_xml(
1383                    "DescribeDBSnapshots",
1384                    RDS_NS,
1385                    &format!(
1386                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1387                        db_snapshot_xml(&snapshot)
1388                    ),
1389                    &request.request_id,
1390                ),
1391            ));
1392        }
1393
1394        // Get snapshots, filtered by instance identifier if provided
1395        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1396            state
1397                .snapshots
1398                .values()
1399                .filter(|s| s.db_instance_identifier == instance_id)
1400                .cloned()
1401                .collect()
1402        } else {
1403            state.snapshots.values().cloned().collect()
1404        };
1405
1406        // Sort by creation time, then identifier
1407        snapshots.sort_by(|a, b| {
1408            a.snapshot_create_time
1409                .cmp(&b.snapshot_create_time)
1410                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1411        });
1412
1413        // Apply pagination
1414        let paginated = paginate(snapshots, marker, max_records, |snap| {
1415            &snap.db_snapshot_identifier
1416        })?;
1417
1418        let marker_xml = paginated
1419            .next_marker
1420            .as_ref()
1421            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1422            .unwrap_or_default();
1423
1424        Ok(AwsResponse::xml(
1425            StatusCode::OK,
1426            query_response_xml(
1427                "DescribeDBSnapshots",
1428                RDS_NS,
1429                &format!(
1430                    "<DBSnapshots>{}</DBSnapshots>{}",
1431                    paginated
1432                        .items
1433                        .iter()
1434                        .map(|snapshot| format!(
1435                            "<DBSnapshot>{}</DBSnapshot>",
1436                            db_snapshot_xml(snapshot)
1437                        ))
1438                        .collect::<String>(),
1439                    marker_xml
1440                ),
1441                &request.request_id,
1442            ),
1443        ))
1444    }
1445
1446    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1447        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1448
1449        let mut accounts = self.state.write();
1450        let state = accounts.get_or_create(&request.account_id);
1451
1452        let snapshot = state
1453            .snapshots
1454            .remove(&db_snapshot_identifier)
1455            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1456        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1457        drop(accounts);
1458
1459        self.emit_event(
1460            RdsSourceType::DbSnapshot,
1461            &db_snapshot_identifier,
1462            &snapshot_arn,
1463            "RDS-EVENT-0041",
1464            &["deletion"],
1465            "Manual snapshot deleted",
1466        );
1467
1468        Ok(AwsResponse::xml(
1469            StatusCode::OK,
1470            query_response_xml(
1471                "DeleteDBSnapshot",
1472                RDS_NS,
1473                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1474                &request.request_id,
1475            ),
1476        ))
1477    }
1478
1479    async fn restore_db_instance_from_db_snapshot(
1480        &self,
1481        request: &AwsRequest,
1482    ) -> Result<AwsResponse, AwsServiceError> {
1483        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1484        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1485        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1486
1487        let runtime = self.require_runtime()?;
1488
1489        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1490            let mut accounts = self.state.write();
1491            let state = accounts.get_or_create(&request.account_id);
1492
1493            if !state.begin_instance_creation(&db_instance_identifier) {
1494                return Err(AwsServiceError::aws_error(
1495                    StatusCode::CONFLICT,
1496                    "DBInstanceAlreadyExists",
1497                    format!("DBInstance {db_instance_identifier} already exists."),
1498                ));
1499            }
1500
1501            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1502                Some(s) => s,
1503                None => {
1504                    state.cancel_instance_creation(&db_instance_identifier);
1505                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1506                }
1507            };
1508
1509            let dbi_resource_id = state.next_dbi_resource_id();
1510            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1511            let created_at = Utc::now();
1512
1513            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1514        };
1515
1516        let db_name = snapshot
1517            .db_name
1518            .as_deref()
1519            .unwrap_or(default_db_name(&snapshot.engine));
1520        let running = match runtime
1521            .ensure_postgres(
1522                &db_instance_identifier,
1523                &snapshot.engine,
1524                &snapshot.engine_version,
1525                &snapshot.master_username,
1526                &snapshot.master_user_password,
1527                db_name,
1528                &request.account_id,
1529                &request.region,
1530            )
1531            .await
1532        {
1533            Ok(running) => running,
1534            Err(e) => {
1535                self.state
1536                    .write()
1537                    .get_or_create(&request.account_id)
1538                    .cancel_instance_creation(&db_instance_identifier);
1539                return Err(runtime_error_to_service_error(e));
1540            }
1541        };
1542
1543        if let Err(e) = runtime
1544            .restore_database(
1545                &db_instance_identifier,
1546                &snapshot.engine,
1547                &snapshot.master_username,
1548                &snapshot.master_user_password,
1549                db_name,
1550                &snapshot.dump_data,
1551            )
1552            .await
1553        {
1554            self.state
1555                .write()
1556                .get_or_create(&request.account_id)
1557                .cancel_instance_creation(&db_instance_identifier);
1558            runtime.stop_container(&db_instance_identifier).await;
1559            return Err(runtime_error_to_service_error(e));
1560        }
1561
1562        let instance = build_restored_instance(
1563            &db_instance_identifier,
1564            db_instance_arn,
1565            dbi_resource_id,
1566            created_at,
1567            vpc_security_group_ids,
1568            &snapshot,
1569            &running,
1570        );
1571
1572        self.state
1573            .write()
1574            .get_or_create(&request.account_id)
1575            .finish_instance_creation(instance.clone());
1576
1577        self.emit_event(
1578            RdsSourceType::DbInstance,
1579            &db_instance_identifier,
1580            &instance.db_instance_arn,
1581            "RDS-EVENT-0043",
1582            &["creation"],
1583            "DB instance restored from snapshot",
1584        );
1585
1586        Ok(AwsResponse::xml(
1587            StatusCode::OK,
1588            query_response_xml(
1589                "RestoreDBInstanceFromDBSnapshot",
1590                RDS_NS,
1591                &format!(
1592                    "<DBInstance>{}</DBInstance>",
1593                    db_instance_xml(&instance, None)
1594                ),
1595                &request.request_id,
1596            ),
1597        ))
1598    }
1599
1600    async fn create_db_instance_read_replica(
1601        &self,
1602        request: &AwsRequest,
1603    ) -> Result<AwsResponse, AwsServiceError> {
1604        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1605        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1606
1607        let runtime = self.runtime.as_ref().ok_or_else(|| {
1608            AwsServiceError::aws_error(
1609                StatusCode::SERVICE_UNAVAILABLE,
1610                "InvalidParameterValue",
1611                "Docker/Podman is required for RDS read replicas but is not available",
1612            )
1613        })?;
1614
1615        let (source_instance, db_name) = {
1616            let mut accounts = self.state.write();
1617            let state = accounts.get_or_create(&request.account_id);
1618
1619            if !state.begin_instance_creation(&db_instance_identifier) {
1620                return Err(AwsServiceError::aws_error(
1621                    StatusCode::CONFLICT,
1622                    "DBInstanceAlreadyExists",
1623                    format!("DBInstance {db_instance_identifier} already exists."),
1624                ));
1625            }
1626
1627            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1628            {
1629                Some(inst) => inst,
1630                None => {
1631                    state.cancel_instance_creation(&db_instance_identifier);
1632                    return Err(db_instance_not_found(&source_db_instance_identifier));
1633                }
1634            };
1635
1636            let default_db = default_db_name(&source_instance.engine);
1637            let db_name = source_instance
1638                .db_name
1639                .as_deref()
1640                .unwrap_or(default_db)
1641                .to_string();
1642
1643            (source_instance, db_name)
1644        };
1645
1646        let dump_data = match runtime
1647            .dump_database(
1648                &source_db_instance_identifier,
1649                &source_instance.engine,
1650                &source_instance.master_username,
1651                &source_instance.master_user_password,
1652                &db_name,
1653            )
1654            .await
1655        {
1656            Ok(data) => data,
1657            Err(e) => {
1658                self.state
1659                    .write()
1660                    .get_or_create(&request.account_id)
1661                    .cancel_instance_creation(&db_instance_identifier);
1662                return Err(runtime_error_to_service_error(e));
1663            }
1664        };
1665
1666        let (dbi_resource_id, db_instance_arn) = {
1667            let accounts = self.state.read();
1668            let empty = RdsState::new(&request.account_id, &request.region);
1669            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1670            (
1671                s.next_dbi_resource_id(),
1672                s.db_instance_arn(&db_instance_identifier),
1673            )
1674        };
1675        let created_at = Utc::now();
1676
1677        let running = match runtime
1678            .ensure_postgres(
1679                &db_instance_identifier,
1680                &source_instance.engine,
1681                &source_instance.engine_version,
1682                &source_instance.master_username,
1683                &source_instance.master_user_password,
1684                &db_name,
1685                &request.account_id,
1686                &request.region,
1687            )
1688            .await
1689        {
1690            Ok(running) => running,
1691            Err(e) => {
1692                self.state
1693                    .write()
1694                    .get_or_create(&request.account_id)
1695                    .cancel_instance_creation(&db_instance_identifier);
1696                return Err(runtime_error_to_service_error(e));
1697            }
1698        };
1699
1700        if let Err(e) = runtime
1701            .restore_database(
1702                &db_instance_identifier,
1703                &source_instance.engine,
1704                &source_instance.master_username,
1705                &source_instance.master_user_password,
1706                &db_name,
1707                &dump_data,
1708            )
1709            .await
1710        {
1711            self.state
1712                .write()
1713                .get_or_create(&request.account_id)
1714                .cancel_instance_creation(&db_instance_identifier);
1715            runtime.stop_container(&db_instance_identifier).await;
1716            return Err(runtime_error_to_service_error(e));
1717        }
1718
1719        let replica = build_read_replica_instance(
1720            &db_instance_identifier,
1721            db_instance_arn,
1722            dbi_resource_id,
1723            created_at,
1724            &source_db_instance_identifier,
1725            &source_instance,
1726            &running,
1727        );
1728
1729        let source_missing = {
1730            let mut accounts = self.state.write();
1731            let state = accounts.get_or_create(&request.account_id);
1732            match state.instances.get_mut(&source_db_instance_identifier) {
1733                Some(source) => {
1734                    source
1735                        .read_replica_db_instance_identifiers
1736                        .push(db_instance_identifier.clone());
1737                    state.finish_instance_creation(replica.clone());
1738                    false
1739                }
1740                None => {
1741                    state.cancel_instance_creation(&db_instance_identifier);
1742                    true
1743                }
1744            }
1745        };
1746
1747        if source_missing {
1748            runtime.stop_container(&db_instance_identifier).await;
1749            return Err(db_instance_not_found(&source_db_instance_identifier));
1750        }
1751
1752        self.emit_event(
1753            RdsSourceType::DbInstance,
1754            &db_instance_identifier,
1755            &replica.db_instance_arn,
1756            "RDS-EVENT-0005",
1757            &["creation", "read replica"],
1758            "Read replica DB instance created",
1759        );
1760
1761        Ok(AwsResponse::xml(
1762            StatusCode::OK,
1763            query_response_xml(
1764                "CreateDBInstanceReadReplica",
1765                RDS_NS,
1766                &format!(
1767                    "<DBInstance>{}</DBInstance>",
1768                    db_instance_xml(&replica, None)
1769                ),
1770                &request.request_id,
1771            ),
1772        ))
1773    }
1774
1775    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1776        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1777        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1778        let subnet_ids = parse_subnet_ids(request)?;
1779
1780        if subnet_ids.is_empty() {
1781            return Err(AwsServiceError::aws_error(
1782                StatusCode::BAD_REQUEST,
1783                "InvalidParameterValue",
1784                "At least one subnet must be specified.",
1785            ));
1786        }
1787
1788        if subnet_ids.len() < 2 {
1789            return Err(AwsServiceError::aws_error(
1790                StatusCode::BAD_REQUEST,
1791                "DBSubnetGroupDoesNotCoverEnoughAZs",
1792                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1793            ));
1794        }
1795
1796        let mut accounts = self.state.write();
1797        let state = accounts.get_or_create(&request.account_id);
1798
1799        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1800            return Err(AwsServiceError::aws_error(
1801                StatusCode::CONFLICT,
1802                "DBSubnetGroupAlreadyExists",
1803                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1804            ));
1805        }
1806
1807        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1808        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1809            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1810            .collect();
1811
1812        // Validate that subnets span at least 2 unique Availability Zones
1813        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1814        if unique_azs.len() < 2 {
1815            return Err(AwsServiceError::aws_error(
1816                StatusCode::BAD_REQUEST,
1817                "DBSubnetGroupDoesNotCoverEnoughAZs",
1818                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1819            ));
1820        }
1821
1822        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1823        let tags = parse_tags(request)?;
1824
1825        let subnet_group = DbSubnetGroup {
1826            db_subnet_group_name: db_subnet_group_name.clone(),
1827            db_subnet_group_arn,
1828            db_subnet_group_description,
1829            vpc_id,
1830            subnet_ids,
1831            subnet_availability_zones,
1832            tags,
1833        };
1834
1835        state
1836            .subnet_groups
1837            .insert(db_subnet_group_name, subnet_group.clone());
1838
1839        Ok(AwsResponse::xml(
1840            StatusCode::OK,
1841            query_response_xml(
1842                "CreateDBSubnetGroup",
1843                RDS_NS,
1844                &format!(
1845                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1846                    db_subnet_group_xml(&subnet_group)
1847                ),
1848                &request.request_id,
1849            ),
1850        ))
1851    }
1852
1853    fn describe_db_subnet_groups(
1854        &self,
1855        request: &AwsRequest,
1856    ) -> Result<AwsResponse, AwsServiceError> {
1857        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1858        let marker = optional_param(request, "Marker");
1859        let max_records = optional_param(request, "MaxRecords");
1860
1861        let accounts = self.state.read();
1862        let empty = RdsState::new(&request.account_id, &request.region);
1863        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1864
1865        // If specific subnet group requested, return just that one (no pagination)
1866        if let Some(name) = db_subnet_group_name {
1867            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1868                AwsServiceError::aws_error(
1869                    StatusCode::NOT_FOUND,
1870                    "DBSubnetGroupNotFoundFault",
1871                    format!("DBSubnetGroup {} not found.", name),
1872                )
1873            })?;
1874
1875            return Ok(AwsResponse::xml(
1876                StatusCode::OK,
1877                query_response_xml(
1878                    "DescribeDBSubnetGroups",
1879                    RDS_NS,
1880                    &format!(
1881                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1882                        db_subnet_group_xml(sg)
1883                    ),
1884                    &request.request_id,
1885                ),
1886            ));
1887        }
1888
1889        // Get all subnet groups sorted by name
1890        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1891        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1892
1893        // Apply pagination
1894        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1895            &sg.db_subnet_group_name
1896        })?;
1897
1898        let marker_xml = paginated
1899            .next_marker
1900            .as_ref()
1901            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1902            .unwrap_or_default();
1903
1904        let body = paginated
1905            .items
1906            .iter()
1907            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1908            .collect::<Vec<_>>()
1909            .join("");
1910
1911        Ok(AwsResponse::xml(
1912            StatusCode::OK,
1913            query_response_xml(
1914                "DescribeDBSubnetGroups",
1915                RDS_NS,
1916                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1917                &request.request_id,
1918            ),
1919        ))
1920    }
1921
1922    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1923        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1924
1925        let mut accounts = self.state.write();
1926        let state = accounts.get_or_create(&request.account_id);
1927
1928        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1929            return Err(AwsServiceError::aws_error(
1930                StatusCode::NOT_FOUND,
1931                "DBSubnetGroupNotFoundFault",
1932                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1933            ));
1934        }
1935
1936        Ok(AwsResponse::xml(
1937            StatusCode::OK,
1938            query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
1939        ))
1940    }
1941
1942    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1943        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1944        let subnet_ids = parse_subnet_ids(request)?;
1945
1946        if subnet_ids.is_empty() {
1947            return Err(AwsServiceError::aws_error(
1948                StatusCode::BAD_REQUEST,
1949                "InvalidParameterValue",
1950                "At least one subnet must be specified.",
1951            ));
1952        }
1953
1954        if subnet_ids.len() < 2 {
1955            return Err(AwsServiceError::aws_error(
1956                StatusCode::BAD_REQUEST,
1957                "DBSubnetGroupDoesNotCoverEnoughAZs",
1958                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1959            ));
1960        }
1961
1962        let mut accounts = self.state.write();
1963        let state = accounts.get_or_create(&request.account_id);
1964
1965        let region = state.region.clone();
1966
1967        let subnet_group = state
1968            .subnet_groups
1969            .get_mut(&db_subnet_group_name)
1970            .ok_or_else(|| {
1971                AwsServiceError::aws_error(
1972                    StatusCode::NOT_FOUND,
1973                    "DBSubnetGroupNotFoundFault",
1974                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1975                )
1976            })?;
1977
1978        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1979            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1980            .collect();
1981
1982        // Validate that subnets span at least 2 unique Availability Zones
1983        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1984        if unique_azs.len() < 2 {
1985            return Err(AwsServiceError::aws_error(
1986                StatusCode::BAD_REQUEST,
1987                "DBSubnetGroupDoesNotCoverEnoughAZs",
1988                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1989            ));
1990        }
1991
1992        subnet_group.subnet_ids = subnet_ids;
1993        subnet_group.subnet_availability_zones = subnet_availability_zones;
1994
1995        let subnet_group_clone = subnet_group.clone();
1996
1997        Ok(AwsResponse::xml(
1998            StatusCode::OK,
1999            query_response_xml(
2000                "ModifyDBSubnetGroup",
2001                RDS_NS,
2002                &format!(
2003                    "<DBSubnetGroup>{}</DBSubnetGroup>",
2004                    db_subnet_group_xml(&subnet_group_clone)
2005                ),
2006                &request.request_id,
2007            ),
2008        ))
2009    }
2010
2011    fn create_db_parameter_group(
2012        &self,
2013        request: &AwsRequest,
2014    ) -> Result<AwsResponse, AwsServiceError> {
2015        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2016        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
2017        let description = required_param(request, "Description")?;
2018
2019        // Validate parameter group family against supported engines and versions
2020        let valid_families = [
2021            "postgres16",
2022            "postgres15",
2023            "postgres14",
2024            "postgres13",
2025            "mysql8.0",
2026            "mysql5.7",
2027            "mariadb10.11",
2028            "mariadb10.6",
2029        ];
2030
2031        if !valid_families.contains(&db_parameter_group_family.as_str()) {
2032            return Err(AwsServiceError::aws_error(
2033                StatusCode::BAD_REQUEST,
2034                "InvalidParameterValue",
2035                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
2036            ));
2037        }
2038
2039        let mut accounts = self.state.write();
2040        let state = accounts.get_or_create(&request.account_id);
2041
2042        if state
2043            .parameter_groups
2044            .contains_key(&db_parameter_group_name)
2045        {
2046            return Err(AwsServiceError::aws_error(
2047                StatusCode::CONFLICT,
2048                "DBParameterGroupAlreadyExists",
2049                format!("DBParameterGroup {db_parameter_group_name} already exists."),
2050            ));
2051        }
2052
2053        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
2054        let tags = parse_tags(request)?;
2055
2056        let parameter_group = DbParameterGroup {
2057            db_parameter_group_name: db_parameter_group_name.clone(),
2058            db_parameter_group_arn,
2059            db_parameter_group_family,
2060            description,
2061            parameters: std::collections::BTreeMap::new(),
2062            tags,
2063        };
2064
2065        state
2066            .parameter_groups
2067            .insert(db_parameter_group_name, parameter_group.clone());
2068
2069        Ok(AwsResponse::xml(
2070            StatusCode::OK,
2071            query_response_xml(
2072                "CreateDBParameterGroup",
2073                RDS_NS,
2074                &format!(
2075                    "<DBParameterGroup>{}</DBParameterGroup>",
2076                    db_parameter_group_xml(&parameter_group)
2077                ),
2078                &request.request_id,
2079            ),
2080        ))
2081    }
2082
2083    fn describe_db_parameter_groups(
2084        &self,
2085        request: &AwsRequest,
2086    ) -> Result<AwsResponse, AwsServiceError> {
2087        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2088        let marker = optional_param(request, "Marker");
2089        let max_records = optional_param(request, "MaxRecords");
2090
2091        let accounts = self.state.read();
2092        let empty = RdsState::new(&request.account_id, &request.region);
2093        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2094
2095        // If specific parameter group requested, return just that one (no pagination)
2096        if let Some(name) = db_parameter_group_name {
2097            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2098                AwsServiceError::aws_error(
2099                    StatusCode::NOT_FOUND,
2100                    "DBParameterGroupNotFound",
2101                    format!("DBParameterGroup {} not found.", name),
2102                )
2103            })?;
2104
2105            return Ok(AwsResponse::xml(
2106                StatusCode::OK,
2107                query_response_xml(
2108                    "DescribeDBParameterGroups", RDS_NS,
2109                    &format!(
2110                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2111                        db_parameter_group_xml(pg)
2112                    ),
2113                    &request.request_id,
2114                ),
2115            ));
2116        }
2117
2118        // Get all parameter groups sorted by name
2119        let mut parameter_groups: Vec<DbParameterGroup> =
2120            state.parameter_groups.values().cloned().collect();
2121        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2122
2123        // Apply pagination
2124        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2125            &pg.db_parameter_group_name
2126        })?;
2127
2128        let marker_xml = paginated
2129            .next_marker
2130            .as_ref()
2131            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2132            .unwrap_or_default();
2133
2134        let body = paginated
2135            .items
2136            .iter()
2137            .map(|pg| {
2138                format!(
2139                    "<DBParameterGroup>{}</DBParameterGroup>",
2140                    db_parameter_group_xml(pg)
2141                )
2142            })
2143            .collect::<Vec<_>>()
2144            .join("");
2145
2146        Ok(AwsResponse::xml(
2147            StatusCode::OK,
2148            query_response_xml(
2149                "DescribeDBParameterGroups",
2150                RDS_NS,
2151                &format!(
2152                    "<DBParameterGroups>{}</DBParameterGroups>{}",
2153                    body, marker_xml
2154                ),
2155                &request.request_id,
2156            ),
2157        ))
2158    }
2159
2160    fn delete_db_parameter_group(
2161        &self,
2162        request: &AwsRequest,
2163    ) -> Result<AwsResponse, AwsServiceError> {
2164        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2165
2166        let mut accounts = self.state.write();
2167        let state = accounts.get_or_create(&request.account_id);
2168
2169        if db_parameter_group_name.starts_with("default.") {
2170            return Err(AwsServiceError::aws_error(
2171                StatusCode::BAD_REQUEST,
2172                "InvalidParameterValue",
2173                "Cannot delete default parameter groups.",
2174            ));
2175        }
2176
2177        if state
2178            .parameter_groups
2179            .remove(&db_parameter_group_name)
2180            .is_none()
2181        {
2182            return Err(AwsServiceError::aws_error(
2183                StatusCode::NOT_FOUND,
2184                "DBParameterGroupNotFound",
2185                format!("DBParameterGroup {db_parameter_group_name} not found."),
2186            ));
2187        }
2188
2189        Ok(AwsResponse::xml(
2190            StatusCode::OK,
2191            query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
2192        ))
2193    }
2194
2195    fn modify_db_parameter_group(
2196        &self,
2197        request: &AwsRequest,
2198    ) -> Result<AwsResponse, AwsServiceError> {
2199        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2200
2201        let mut accounts = self.state.write();
2202        let state = accounts.get_or_create(&request.account_id);
2203
2204        let parameter_group = state
2205            .parameter_groups
2206            .get_mut(&db_parameter_group_name)
2207            .ok_or_else(|| {
2208                AwsServiceError::aws_error(
2209                    StatusCode::NOT_FOUND,
2210                    "DBParameterGroupNotFound",
2211                    format!("DBParameterGroup {db_parameter_group_name} not found."),
2212                )
2213            })?;
2214
2215        if let Some(new_description) = optional_param(request, "Description") {
2216            parameter_group.description = new_description;
2217        }
2218
2219        let parameter_group_clone = parameter_group.clone();
2220
2221        Ok(AwsResponse::xml(
2222            StatusCode::OK,
2223            query_response_xml(
2224                "ModifyDBParameterGroup",
2225                RDS_NS,
2226                &format!(
2227                    "<DBParameterGroupName>{}</DBParameterGroupName>",
2228                    xml_escape(&parameter_group_clone.db_parameter_group_name)
2229                ),
2230                &request.request_id,
2231            ),
2232        ))
2233    }
2234}
2235
2236fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2237    fakecloud_core::query::optional_query_param(req, name)
2238}
2239
2240fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2241    fakecloud_core::query::required_query_param(req, name)
2242}
2243
2244fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2245    let value = required_param(req, name)?;
2246    value.parse::<i32>().map_err(|_| {
2247        AwsServiceError::aws_error(
2248            StatusCode::BAD_REQUEST,
2249            "InvalidParameterValue",
2250            format!("Parameter {name} must be a valid integer."),
2251        )
2252    })
2253}
2254
2255fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2256    optional_param(req, name)
2257        .map(|value| {
2258            value.parse::<i32>().map_err(|_| {
2259                AwsServiceError::aws_error(
2260                    StatusCode::BAD_REQUEST,
2261                    "InvalidParameterValue",
2262                    format!("Parameter {name} must be a valid integer."),
2263                )
2264            })
2265        })
2266        .transpose()
2267}
2268
2269fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2270    let mut tags = Vec::new();
2271    for index in 1.. {
2272        let key_name = format!("Tags.Tag.{index}.Key");
2273        let value_name = format!("Tags.Tag.{index}.Value");
2274        let key = optional_param(req, &key_name);
2275        let value = optional_param(req, &value_name);
2276
2277        match (key, value) {
2278            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2279            (None, None) => break,
2280            _ => {
2281                return Err(AwsServiceError::aws_error(
2282                    StatusCode::BAD_REQUEST,
2283                    "InvalidParameterValue",
2284                    "Each tag must include both Key and Value.",
2285                ));
2286            }
2287        }
2288    }
2289
2290    Ok(tags)
2291}
2292
2293fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2294    let mut keys = Vec::new();
2295    for index in 1.. {
2296        let key_name = format!("TagKeys.member.{index}");
2297        match optional_param(req, &key_name) {
2298            Some(key) => keys.push(key),
2299            None => break,
2300        }
2301    }
2302
2303    Ok(keys)
2304}
2305
2306fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2307    let mut subnet_ids = Vec::new();
2308    for index in 1.. {
2309        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2310        match optional_param(req, &subnet_id_name) {
2311            Some(subnet_id) => subnet_ids.push(subnet_id),
2312            None => break,
2313        }
2314    }
2315
2316    Ok(subnet_ids)
2317}
2318
2319fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2320    let mut security_group_ids = Vec::new();
2321    for index in 1.. {
2322        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2323        match optional_param(req, &sg_id_name) {
2324            Some(sg_id) => security_group_ids.push(sg_id),
2325            None => break,
2326        }
2327    }
2328
2329    // If no security groups provided, return a default one
2330    if security_group_ids.is_empty() {
2331        security_group_ids.push("sg-default".to_string());
2332    }
2333
2334    security_group_ids
2335}
2336
2337fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2338    req.query_params.keys().any(|key| key.starts_with(prefix))
2339}
2340
2341fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2342    value
2343        .map(|raw| match raw {
2344            "true" | "True" | "TRUE" => Ok(true),
2345            "false" | "False" | "FALSE" => Ok(false),
2346            _ => Err(AwsServiceError::aws_error(
2347                StatusCode::BAD_REQUEST,
2348                "InvalidParameterValue",
2349                format!("Boolean parameter value '{raw}' is invalid."),
2350            )),
2351        })
2352        .transpose()
2353}
2354
2355struct PaginationResult<T> {
2356    items: Vec<T>,
2357    next_marker: Option<String>,
2358}
2359
2360fn paginate<T, F>(
2361    mut items: Vec<T>,
2362    marker: Option<String>,
2363    max_records: Option<String>,
2364    get_id: F,
2365) -> Result<PaginationResult<T>, AwsServiceError>
2366where
2367    F: Fn(&T) -> &str,
2368{
2369    // Parse max_records with default 100, max 100
2370    let max = if let Some(max_str) = max_records {
2371        let parsed = max_str.parse::<i32>().map_err(|_| {
2372            AwsServiceError::aws_error(
2373                StatusCode::BAD_REQUEST,
2374                "InvalidParameterValue",
2375                "MaxRecords must be a valid integer.",
2376            )
2377        })?;
2378        if !(1..=100).contains(&parsed) {
2379            return Err(AwsServiceError::aws_error(
2380                StatusCode::BAD_REQUEST,
2381                "InvalidParameterValue",
2382                "MaxRecords must be between 1 and 100.",
2383            ));
2384        }
2385        parsed as usize
2386    } else {
2387        100
2388    };
2389
2390    // Decode marker to get starting identifier
2391    let start_id = if let Some(encoded_marker) = marker {
2392        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2393            AwsServiceError::aws_error(
2394                StatusCode::BAD_REQUEST,
2395                "InvalidParameterValue",
2396                "Marker is invalid.",
2397            )
2398        })?;
2399        let id = String::from_utf8(decoded).map_err(|_| {
2400            AwsServiceError::aws_error(
2401                StatusCode::BAD_REQUEST,
2402                "InvalidParameterValue",
2403                "Marker is invalid.",
2404            )
2405        })?;
2406        Some(id)
2407    } else {
2408        None
2409    };
2410
2411    // Find starting position
2412    let start_index = if let Some(ref start_id) = start_id {
2413        items
2414            .iter()
2415            .position(|item| get_id(item) == start_id)
2416            .map(|pos| pos + 1) // Start after the marker
2417            .unwrap_or(items.len()) // If not found, return empty result
2418    } else {
2419        0
2420    };
2421
2422    // Take items from start_index
2423    let total_items = items.len();
2424    let end_index = std::cmp::min(start_index + max, total_items);
2425    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2426
2427    // Create next marker if there are more items
2428    let next_marker = if end_index < total_items {
2429        paginated_items
2430            .last()
2431            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2432    } else {
2433        None
2434    };
2435
2436    Ok(PaginationResult {
2437        items: paginated_items,
2438        next_marker,
2439    })
2440}
2441
2442fn validate_create_request(
2443    db_instance_identifier: &str,
2444    allocated_storage: i32,
2445    db_instance_class: &str,
2446    engine: &str,
2447    engine_version: &str,
2448    port: i32,
2449) -> Result<(), AwsServiceError> {
2450    if allocated_storage <= 0 {
2451        return Err(AwsServiceError::aws_error(
2452            StatusCode::BAD_REQUEST,
2453            "InvalidParameterValue",
2454            "AllocatedStorage must be greater than zero.",
2455        ));
2456    }
2457    if port <= 0 {
2458        return Err(AwsServiceError::aws_error(
2459            StatusCode::BAD_REQUEST,
2460            "InvalidParameterValue",
2461            "Port must be greater than zero.",
2462        ));
2463    }
2464    if !db_instance_identifier
2465        .chars()
2466        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2467    {
2468        return Err(AwsServiceError::aws_error(
2469            StatusCode::BAD_REQUEST,
2470            "InvalidParameterValue",
2471            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2472        ));
2473    }
2474    // Validate engine
2475    let supported_engines = [
2476        "postgres",
2477        "mysql",
2478        "mariadb",
2479        "oracle-ee",
2480        "oracle-se2",
2481        "oracle-ee-cdb",
2482        "oracle-se2-cdb",
2483        "sqlserver-ee",
2484        "sqlserver-se",
2485        "sqlserver-ex",
2486        "sqlserver-web",
2487        "db2-se",
2488        "db2-ae",
2489    ];
2490    if !supported_engines.contains(&engine) {
2491        return Err(AwsServiceError::aws_error(
2492            StatusCode::BAD_REQUEST,
2493            "InvalidParameterValue",
2494            format!("Engine '{}' is not supported.", engine),
2495        ));
2496    }
2497
2498    // Validate engine version. The Oracle/SQL Server/Db2 lists track
2499    // the major-minor versions actually shipped by the upstream
2500    // dev-edition images (gvenzl/oracle-free 23, mssql-server 2022,
2501    // db2_community 11.5). Adding a new version here also requires
2502    // wiring the image tag in `RdsRuntime::ensure_postgres`.
2503    // Major versions ("8.0", "10.11", ...) are accepted alongside the
2504    // full `<major>.<minor>.<patch>` triplets — AWS RDS validates both
2505    // forms and the runtime resolves the matching prebuilt image regardless.
2506    let supported_versions = match engine {
2507        "postgres" => vec!["16", "15", "14", "13", "16.3", "15.5", "14.10", "13.13"],
2508        "mysql" => vec!["8.0", "8.0.35", "8.0.28", "5.7.44"],
2509        "mariadb" => vec!["10.6", "10.11", "11.4", "11.4.5", "10.11.6", "10.6.16"],
2510        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2511            vec!["23.0.0", "21.0.0", "19.0.0"]
2512        }
2513        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2514            vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2515        }
2516        "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2517        _ => vec![],
2518    };
2519
2520    if !supported_versions.contains(&engine_version) {
2521        return Err(AwsServiceError::aws_error(
2522            StatusCode::BAD_REQUEST,
2523            "InvalidParameterValue",
2524            format!("EngineVersion '{engine_version}' is not supported yet."),
2525        ));
2526    }
2527    validate_db_instance_class(db_instance_class)?;
2528    Ok(())
2529}
2530
2531fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2532    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2533        return Err(AwsServiceError::aws_error(
2534            StatusCode::BAD_REQUEST,
2535            "InvalidParameterValue",
2536            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2537        ));
2538    }
2539    Ok(())
2540}
2541
2542fn filter_engine_versions(
2543    versions: &[EngineVersionInfo],
2544    engine: &Option<String>,
2545    engine_version: &Option<String>,
2546    family: &Option<String>,
2547) -> Vec<EngineVersionInfo> {
2548    versions
2549        .iter()
2550        .filter(|candidate| {
2551            engine
2552                .as_ref()
2553                .is_none_or(|expected| candidate.engine == *expected)
2554        })
2555        .filter(|candidate| {
2556            engine_version
2557                .as_ref()
2558                .is_none_or(|expected| candidate.engine_version == *expected)
2559        })
2560        .filter(|candidate| {
2561            family
2562                .as_ref()
2563                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2564        })
2565        .cloned()
2566        .collect()
2567}
2568
2569fn filter_orderable_options(
2570    options: &[OrderableDbInstanceOption],
2571    engine: &Option<String>,
2572    engine_version: &Option<String>,
2573    db_instance_class: &Option<String>,
2574    license_model: &Option<String>,
2575    vpc: Option<bool>,
2576) -> Vec<OrderableDbInstanceOption> {
2577    options
2578        .iter()
2579        .filter(|candidate| {
2580            engine
2581                .as_ref()
2582                .is_none_or(|expected| candidate.engine == *expected)
2583        })
2584        .filter(|candidate| {
2585            engine_version
2586                .as_ref()
2587                .is_none_or(|expected| candidate.engine_version == *expected)
2588        })
2589        .filter(|candidate| {
2590            db_instance_class
2591                .as_ref()
2592                .is_none_or(|expected| candidate.db_instance_class == *expected)
2593        })
2594        .filter(|candidate| {
2595            license_model
2596                .as_ref()
2597                .is_none_or(|expected| candidate.license_model == *expected)
2598        })
2599        .filter(|_| vpc.unwrap_or(true))
2600        .cloned()
2601        .collect()
2602}
2603
2604/// Build a `DbInstance` for a newly-created read replica, copying the
2605/// source instance's physical attributes and binding the replica's
2606/// identifier, ARN, resource id, container id and host port.
2607#[allow(clippy::too_many_arguments)]
2608/// Build a `DbInstance` from a restored snapshot. Copies the physical
2609/// attributes off the snapshot and binds the new instance's identifier,
2610/// ARN, resource id, container id and host port.
2611fn build_restored_instance(
2612    db_instance_identifier: &str,
2613    db_instance_arn: String,
2614    dbi_resource_id: String,
2615    created_at: chrono::DateTime<Utc>,
2616    vpc_security_group_ids: Vec<String>,
2617    snapshot: &DbSnapshot,
2618    running: &crate::runtime::RunningDbContainer,
2619) -> DbInstance {
2620    DbInstance {
2621        db_instance_identifier: db_instance_identifier.to_string(),
2622        db_instance_arn,
2623        db_instance_class: "db.t3.micro".to_string(),
2624        engine: snapshot.engine.clone(),
2625        engine_version: snapshot.engine_version.clone(),
2626        db_instance_status: "available".to_string(),
2627        master_username: snapshot.master_username.clone(),
2628        db_name: snapshot.db_name.clone(),
2629        endpoint_address: "127.0.0.1".to_string(),
2630        port: i32::from(running.host_port),
2631        allocated_storage: snapshot.allocated_storage,
2632        publicly_accessible: true,
2633        deletion_protection: false,
2634        created_at,
2635        dbi_resource_id,
2636        master_user_password: snapshot.master_user_password.clone(),
2637        container_id: running.container_id.clone(),
2638        host_port: running.host_port,
2639        tags: Vec::new(),
2640        read_replica_source_db_instance_identifier: None,
2641        read_replica_db_instance_identifiers: Vec::new(),
2642        vpc_security_group_ids,
2643        db_parameter_group_name: None,
2644        backup_retention_period: 1,
2645        preferred_backup_window: "03:00-04:00".to_string(),
2646        latest_restorable_time: Some(created_at),
2647        option_group_name: None,
2648        multi_az: false,
2649        pending_modified_values: None,
2650    }
2651}
2652
2653fn build_read_replica_instance(
2654    db_instance_identifier: &str,
2655    db_instance_arn: String,
2656    dbi_resource_id: String,
2657    created_at: chrono::DateTime<Utc>,
2658    source_db_instance_identifier: &str,
2659    source: &DbInstance,
2660    running: &crate::runtime::RunningDbContainer,
2661) -> DbInstance {
2662    DbInstance {
2663        db_instance_identifier: db_instance_identifier.to_string(),
2664        db_instance_arn,
2665        db_instance_class: source.db_instance_class.clone(),
2666        engine: source.engine.clone(),
2667        engine_version: source.engine_version.clone(),
2668        db_instance_status: "available".to_string(),
2669        master_username: source.master_username.clone(),
2670        db_name: source.db_name.clone(),
2671        endpoint_address: "127.0.0.1".to_string(),
2672        port: i32::from(running.host_port),
2673        allocated_storage: source.allocated_storage,
2674        publicly_accessible: source.publicly_accessible,
2675        deletion_protection: false,
2676        created_at,
2677        dbi_resource_id,
2678        master_user_password: source.master_user_password.clone(),
2679        container_id: running.container_id.clone(),
2680        host_port: running.host_port,
2681        tags: Vec::new(),
2682        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2683        read_replica_db_instance_identifiers: Vec::new(),
2684        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2685        db_parameter_group_name: source.db_parameter_group_name.clone(),
2686        backup_retention_period: source.backup_retention_period,
2687        preferred_backup_window: source.preferred_backup_window.clone(),
2688        latest_restorable_time: if source.backup_retention_period > 0 {
2689            Some(created_at)
2690        } else {
2691            None
2692        },
2693        option_group_name: source.option_group_name.clone(),
2694        multi_az: source.multi_az,
2695        pending_modified_values: None,
2696    }
2697}
2698
2699fn engine_version_xml(version: &EngineVersionInfo) -> String {
2700    format!(
2701        "<DBEngineVersion>\
2702         <Engine>{}</Engine>\
2703         <EngineVersion>{}</EngineVersion>\
2704         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2705         <DBEngineDescription>{}</DBEngineDescription>\
2706         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2707         <Status>{}</Status>\
2708         </DBEngineVersion>",
2709        xml_escape(&version.engine),
2710        xml_escape(&version.engine_version),
2711        xml_escape(&version.db_parameter_group_family),
2712        xml_escape(&version.db_engine_description),
2713        xml_escape(&version.db_engine_version_description),
2714        xml_escape(&version.status),
2715    )
2716}
2717
2718fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2719    format!(
2720        "<OrderableDBInstanceOption>\
2721         <Engine>{}</Engine>\
2722         <EngineVersion>{}</EngineVersion>\
2723         <DBInstanceClass>{}</DBInstanceClass>\
2724         <LicenseModel>{}</LicenseModel>\
2725         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2726         <MultiAZCapable>true</MultiAZCapable>\
2727         <ReadReplicaCapable>true</ReadReplicaCapable>\
2728         <Vpc>true</Vpc>\
2729         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2730         <StorageType>{}</StorageType>\
2731         <SupportsIops>false</SupportsIops>\
2732         <MinStorageSize>{}</MinStorageSize>\
2733         <MaxStorageSize>{}</MaxStorageSize>\
2734         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2735         </OrderableDBInstanceOption>",
2736        xml_escape(&option.engine),
2737        xml_escape(&option.engine_version),
2738        xml_escape(&option.db_instance_class),
2739        xml_escape(&option.license_model),
2740        xml_escape(&option.storage_type),
2741        option.min_storage_size,
2742        option.max_storage_size,
2743    )
2744}
2745
2746fn tag_xml(tag: &RdsTag) -> String {
2747    format!(
2748        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2749        xml_escape(&tag.key),
2750        xml_escape(&tag.value),
2751    )
2752}
2753
2754/// Free-standing version of `emit_event` so background tasks (which
2755/// don't have a `&self`) can publish RDS events through the same path.
2756pub(crate) fn emit_event_static(
2757    delivery_bus: Option<&Arc<DeliveryBus>>,
2758    source_type: RdsSourceType,
2759    source_identifier: &str,
2760    source_arn: &str,
2761    event_id: &str,
2762    event_categories: &[&str],
2763    message: &str,
2764) {
2765    let Some(bus) = delivery_bus else {
2766        return;
2767    };
2768    let detail = serde_json::json!({
2769        "EventCategories": event_categories,
2770        "SourceType": source_type.as_str(),
2771        "SourceArn": source_arn,
2772        "Date": Utc::now().to_rfc3339(),
2773        "Message": message,
2774        "SourceIdentifier": source_identifier,
2775        "EventID": event_id,
2776    });
2777    bus.put_event_to_eventbridge(
2778        "aws.rds",
2779        source_type.detail_type(),
2780        &detail.to_string(),
2781        "default",
2782    );
2783}
2784
2785fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2786    let status = status_override.unwrap_or(&instance.db_instance_status);
2787    let db_name_xml = instance
2788        .db_name
2789        .as_ref()
2790        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2791        .unwrap_or_default();
2792
2793    let read_replica_source_xml = instance
2794        .read_replica_source_db_instance_identifier
2795        .as_ref()
2796        .map(|source| {
2797            format!(
2798                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2799                xml_escape(source)
2800            )
2801        })
2802        .unwrap_or_default();
2803
2804    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2805        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2806    } else {
2807        format!(
2808            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2809            instance
2810                .read_replica_db_instance_identifiers
2811                .iter()
2812                .map(|id| format!(
2813                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2814                    xml_escape(id)
2815                ))
2816                .collect::<String>()
2817        )
2818    };
2819
2820    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2821        "<VpcSecurityGroups/>".to_string()
2822    } else {
2823        format!(
2824            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2825            instance
2826                .vpc_security_group_ids
2827                .iter()
2828                .map(|sg_id| format!(
2829                    "<VpcSecurityGroupMembership>\
2830                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2831                     <Status>active</Status>\
2832                     </VpcSecurityGroupMembership>",
2833                    xml_escape(sg_id)
2834                ))
2835                .collect::<String>()
2836        )
2837    };
2838
2839    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2840        Some(pg_name) => format!(
2841            "<DBParameterGroups>\
2842             <DBParameterGroup>\
2843             <DBParameterGroupName>{}</DBParameterGroupName>\
2844             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2845             </DBParameterGroup>\
2846             </DBParameterGroups>",
2847            xml_escape(pg_name)
2848        ),
2849        None => "<DBParameterGroups/>".to_string(),
2850    };
2851
2852    let option_group_memberships_xml = match &instance.option_group_name {
2853        Some(og_name) => format!(
2854            "<OptionGroupMemberships>\
2855             <OptionGroupMembership>\
2856             <OptionGroupName>{}</OptionGroupName>\
2857             <Status>in-sync</Status>\
2858             </OptionGroupMembership>\
2859             </OptionGroupMemberships>",
2860            xml_escape(og_name)
2861        ),
2862        None => "<OptionGroupMemberships/>".to_string(),
2863    };
2864
2865    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2866        let mut fields = Vec::new();
2867        if let Some(ref class) = pending.db_instance_class {
2868            fields.push(format!(
2869                "<DBInstanceClass>{}</DBInstanceClass>",
2870                xml_escape(class)
2871            ));
2872        }
2873        if let Some(allocated_storage) = pending.allocated_storage {
2874            fields.push(format!(
2875                "<AllocatedStorage>{}</AllocatedStorage>",
2876                allocated_storage
2877            ));
2878        }
2879        if let Some(backup_retention_period) = pending.backup_retention_period {
2880            fields.push(format!(
2881                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2882                backup_retention_period
2883            ));
2884        }
2885        if let Some(multi_az) = pending.multi_az {
2886            fields.push(format!(
2887                "<MultiAZ>{}</MultiAZ>",
2888                if multi_az { "true" } else { "false" }
2889            ));
2890        }
2891        if let Some(ref engine_version) = pending.engine_version {
2892            fields.push(format!(
2893                "<EngineVersion>{}</EngineVersion>",
2894                xml_escape(engine_version)
2895            ));
2896        }
2897        if pending.master_user_password.is_some() {
2898            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2899        }
2900        if !fields.is_empty() {
2901            format!(
2902                "<PendingModifiedValues>{}</PendingModifiedValues>",
2903                fields.join("")
2904            )
2905        } else {
2906            String::new()
2907        }
2908    } else {
2909        String::new()
2910    };
2911
2912    let latest_restorable_time_xml = instance
2913        .latest_restorable_time
2914        .map(|t| {
2915            format!(
2916                "<LatestRestorableTime>{}</LatestRestorableTime>",
2917                t.to_rfc3339()
2918            )
2919        })
2920        .unwrap_or_default();
2921
2922    // Endpoint is suppressed while the container is still coming up so
2923    // SDK callers don't try to dial an empty host:0. Once the background
2924    // task fills in `endpoint_address` and `port`, DescribeDBInstances
2925    // returns the real endpoint.
2926    let endpoint_xml = if instance.endpoint_address.is_empty() || instance.port == 0 {
2927        String::new()
2928    } else {
2929        format!(
2930            "<Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>",
2931            xml_escape(&instance.endpoint_address),
2932            instance.port
2933        )
2934    };
2935
2936    format!(
2937        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2938         <DBInstanceClass>{class}</DBInstanceClass>\
2939         <Engine>{engine}</Engine>\
2940         <DBInstanceStatus>{status}</DBInstanceStatus>\
2941         <MasterUsername>{master_username}</MasterUsername>\
2942         {db_name_xml}\
2943         {endpoint_xml}\
2944         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2945         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2946         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2947         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2948         <DBSecurityGroups/>\
2949         {vpc_security_groups_xml}\
2950         {db_parameter_groups_xml}\
2951         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2952         {latest_restorable_time_xml}\
2953         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2954         <MultiAZ>{multi_az}</MultiAZ>\
2955         <EngineVersion>{engine_version}</EngineVersion>\
2956         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2957         {read_replica_identifiers_xml}\
2958         {read_replica_source_xml}\
2959         <LicenseModel>{license_model}</LicenseModel>\
2960         {option_group_memberships_xml}\
2961         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2962         <StorageType>gp2</StorageType>\
2963         <DbInstancePort>{port}</DbInstancePort>\
2964         <StorageEncrypted>false</StorageEncrypted>\
2965         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2966         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2967         {pending_modified_values_xml}\
2968         <DBInstanceArn>{arn}</DBInstanceArn>",
2969        identifier = xml_escape(&instance.db_instance_identifier),
2970        class = xml_escape(&instance.db_instance_class),
2971        engine = xml_escape(&instance.engine),
2972        status = xml_escape(status),
2973        master_username = xml_escape(&instance.master_username),
2974        port = instance.port,
2975        allocated_storage = instance.allocated_storage,
2976        create_time = instance.created_at.to_rfc3339(),
2977        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2978        backup_retention_period = instance.backup_retention_period,
2979        multi_az = if instance.multi_az { "true" } else { "false" },
2980        engine_version = xml_escape(&instance.engine_version),
2981        license_model = license_model_for_engine(&instance.engine),
2982        publicly_accessible = if instance.publicly_accessible {
2983            "true"
2984        } else {
2985            "false"
2986        },
2987        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2988        deletion_protection = if instance.deletion_protection {
2989            "true"
2990        } else {
2991            "false"
2992        },
2993        arn = xml_escape(&instance.db_instance_arn),
2994    )
2995}
2996
2997fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2998    format!(
2999        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
3000         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
3001         <SnapshotCreateTime>{}</SnapshotCreateTime>\
3002         <Engine>{}</Engine>\
3003         <EngineVersion>{}</EngineVersion>\
3004         <AllocatedStorage>{}</AllocatedStorage>\
3005         <Status>{}</Status>\
3006         <Port>{}</Port>\
3007         <MasterUsername>{}</MasterUsername>\
3008         {}\
3009         <DbiResourceId>{}</DbiResourceId>\
3010         <SnapshotType>{}</SnapshotType>\
3011         <DBSnapshotArn>{}</DBSnapshotArn>",
3012        xml_escape(&snapshot.db_snapshot_identifier),
3013        xml_escape(&snapshot.db_instance_identifier),
3014        snapshot.snapshot_create_time.to_rfc3339(),
3015        xml_escape(&snapshot.engine),
3016        xml_escape(&snapshot.engine_version),
3017        snapshot.allocated_storage,
3018        xml_escape(&snapshot.status),
3019        snapshot.port,
3020        xml_escape(&snapshot.master_username),
3021        snapshot
3022            .db_name
3023            .as_ref()
3024            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
3025            .unwrap_or_default(),
3026        xml_escape(&snapshot.dbi_resource_id),
3027        xml_escape(&snapshot.snapshot_type),
3028        xml_escape(&snapshot.db_snapshot_arn),
3029    )
3030}
3031
3032fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
3033    let subnets_xml = subnet_group
3034        .subnet_ids
3035        .iter()
3036        .zip(&subnet_group.subnet_availability_zones)
3037        .map(|(subnet_id, az)| {
3038            format!(
3039                "<Subnet>\
3040                 <SubnetIdentifier>{}</SubnetIdentifier>\
3041                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3042                 <SubnetStatus>Active</SubnetStatus>\
3043                 </Subnet>",
3044                xml_escape(subnet_id),
3045                xml_escape(az)
3046            )
3047        })
3048        .collect::<String>();
3049
3050    format!(
3051        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
3052         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
3053         <VpcId>{}</VpcId>\
3054         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
3055         <Subnets>{}</Subnets>\
3056         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
3057        xml_escape(&subnet_group.db_subnet_group_name),
3058        xml_escape(&subnet_group.db_subnet_group_description),
3059        xml_escape(&subnet_group.vpc_id),
3060        subnets_xml,
3061        xml_escape(&subnet_group.db_subnet_group_arn),
3062    )
3063}
3064
3065fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
3066    format!(
3067        "<DBParameterGroupName>{}</DBParameterGroupName>\
3068         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
3069         <Description>{}</Description>\
3070         <DBParameterGroupArn>{}</DBParameterGroupArn>",
3071        xml_escape(&parameter_group.db_parameter_group_name),
3072        xml_escape(&parameter_group.db_parameter_group_family),
3073        xml_escape(&parameter_group.description),
3074        xml_escape(&parameter_group.db_parameter_group_arn),
3075    )
3076}
3077
3078fn db_instance_not_found(identifier: &str) -> AwsServiceError {
3079    AwsServiceError::aws_error(
3080        StatusCode::NOT_FOUND,
3081        "DBInstanceNotFound",
3082        format!("DBInstance {} not found.", identifier),
3083    )
3084}
3085
3086fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
3087    AwsServiceError::aws_error(
3088        StatusCode::NOT_FOUND,
3089        "DBSnapshotNotFound",
3090        format!("DBSnapshot {} not found.", identifier),
3091    )
3092}
3093
3094fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
3095    AwsServiceError::aws_error(
3096        StatusCode::NOT_FOUND,
3097        "DBInstanceNotFound",
3098        format!("DBInstance {resource_name} not found."),
3099    )
3100}
3101
3102fn find_instance_by_arn<'a>(
3103    state: &'a crate::state::RdsState,
3104    resource_name: &str,
3105) -> Result<&'a DbInstance, AwsServiceError> {
3106    state
3107        .instances
3108        .values()
3109        .find(|instance| instance.db_instance_arn == resource_name)
3110        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3111}
3112
3113fn find_instance_by_arn_mut<'a>(
3114    state: &'a mut crate::state::RdsState,
3115    resource_name: &str,
3116) -> Result<&'a mut DbInstance, AwsServiceError> {
3117    state
3118        .instances
3119        .values_mut()
3120        .find(|instance| instance.db_instance_arn == resource_name)
3121        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3122}
3123
3124fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3125    for tag in incoming {
3126        if let Some(existing_tag) = existing
3127            .iter_mut()
3128            .find(|candidate| candidate.key == tag.key)
3129        {
3130            existing_tag.value = tag.value.clone();
3131        } else {
3132            existing.push(tag.clone());
3133        }
3134    }
3135}
3136
3137fn license_model_for_engine(engine: &str) -> &'static str {
3138    // Match AWS's reported license model exactly. Oracle and SQL Server
3139    // both use the BYOL/license-included split; fakecloud reports
3140    // license-included since the upstream dev-edition images are
3141    // free-to-use. Db2 is reported as bring-your-own-license to mirror
3142    // AWS's RDS for Db2 default.
3143    match engine {
3144        "mysql" | "mariadb" => "general-public-license",
3145        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3146        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3147        "db2-se" | "db2-ae" => "bring-your-own-license",
3148        _ => "postgresql-license",
3149    }
3150}
3151
3152fn default_db_name(engine: &str) -> &'static str {
3153    match engine {
3154        "mysql" | "mariadb" => "mysql",
3155        // Oracle's gvenzl image creates an `ORACLE_DATABASE` alongside
3156        // the built-in FREEPDB1 — keep `ORCL` as the default name to
3157        // match what AWS RDS for Oracle returns when you don't pass
3158        // `DBName`.
3159        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3160        // SQL Server installs system DBs by default; AWS doesn't
3161        // create a user DB unless `DBName` is supplied. Use `master`
3162        // as the default the SDK can connect to.
3163        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3164        "db2-se" | "db2-ae" => "BLUDB",
3165        _ => "postgres",
3166    }
3167}
3168
3169/// Pick the port AWS defaults to for a freshly-created instance of
3170/// `engine`. Mirrors the AWS RDS defaults so client SDKs that connect
3171/// without an explicit `--port` flag hit the right listener.
3172fn default_port_for_engine(engine: &str) -> i32 {
3173    match engine {
3174        "postgres" => 5432,
3175        "mysql" | "mariadb" => 3306,
3176        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3177        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3178        "db2-se" | "db2-ae" => 50000,
3179        _ => 5432,
3180    }
3181}
3182
3183/// Pick the built-in parameter group name AWS assigns to a new
3184/// instance when the caller doesn't override it. The name encodes the
3185/// engine family plus its major version (e.g. `default.postgres16`,
3186/// `default.mysql8.0`, `default.oracle-ee-23`, `default.sqlserver-ex-16`,
3187/// `default.db2-se-11.5`).
3188fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3189    match engine {
3190        "postgres" => {
3191            let major = engine_version.split('.').next().unwrap_or("16");
3192            format!("default.postgres{}", major)
3193        }
3194        "mysql" => {
3195            let major = if engine_version.starts_with("5.7") {
3196                "5.7"
3197            } else {
3198                "8.0"
3199            };
3200            format!("default.mysql{}", major)
3201        }
3202        "mariadb" => {
3203            let major = if engine_version.starts_with("11.4") {
3204                "11.4"
3205            } else if engine_version.starts_with("10.11") {
3206                "10.11"
3207            } else {
3208                "10.6"
3209            };
3210            format!("default.mariadb{}", major)
3211        }
3212        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3213            let major = engine_version.split('.').next().unwrap_or("23");
3214            format!("default.{engine}-{major}")
3215        }
3216        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3217            // AWS uses the SQL Server major-version number ("16" for
3218            // 2022, "15" for 2019) in the default parameter group.
3219            let major = engine_version.split('.').next().unwrap_or("16");
3220            format!("default.{engine}-{major}")
3221        }
3222        "db2-se" | "db2-ae" => {
3223            // Db2 ships major.minor as the parameter-group key
3224            // (e.g. `default.db2-se-11.5`).
3225            let mut parts = engine_version.split('.');
3226            let major = parts.next().unwrap_or("11");
3227            let minor = parts.next().unwrap_or("5");
3228            format!("default.{engine}-{major}.{minor}")
3229        }
3230        _ => "default.postgres16".to_string(),
3231    }
3232}
3233
3234fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3235    match error {
3236        RuntimeError::Unavailable => AwsServiceError::aws_error(
3237            StatusCode::SERVICE_UNAVAILABLE,
3238            "InvalidParameterValue",
3239            "Docker/Podman is required for RDS DB instances but is not available",
3240        ),
3241        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3242            StatusCode::INTERNAL_SERVER_ERROR,
3243            "InternalFailure",
3244            message,
3245        ),
3246    }
3247}
3248
3249#[cfg(test)]
3250mod tests {
3251    use std::collections::HashMap;
3252    use std::sync::Arc;
3253
3254    use bytes::Bytes;
3255    use chrono::Utc;
3256    use http::{HeaderMap, Method};
3257    use parking_lot::RwLock;
3258    use uuid::Uuid;
3259
3260    use super::{
3261        db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3262        filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3263        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3264        RdsSourceType,
3265    };
3266    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3267    use fakecloud_core::delivery::DeliveryBus;
3268    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3269
3270    #[test]
3271    fn default_port_matches_aws_for_each_engine() {
3272        assert_eq!(default_port_for_engine("postgres"), 5432);
3273        assert_eq!(default_port_for_engine("mysql"), 3306);
3274        assert_eq!(default_port_for_engine("mariadb"), 3306);
3275        assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3276        assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3277        assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3278        assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3279        assert_eq!(default_port_for_engine("db2-se"), 50000);
3280        assert_eq!(default_port_for_engine("db2-ae"), 50000);
3281    }
3282
3283    #[test]
3284    fn default_parameter_group_uses_engine_major_version() {
3285        assert_eq!(
3286            default_parameter_group("postgres", "16.3"),
3287            "default.postgres16"
3288        );
3289        assert_eq!(
3290            default_parameter_group("mysql", "8.0.35"),
3291            "default.mysql8.0"
3292        );
3293        assert_eq!(
3294            default_parameter_group("oracle-ee", "23.0.0"),
3295            "default.oracle-ee-23"
3296        );
3297        assert_eq!(
3298            default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3299            "default.sqlserver-ex-16"
3300        );
3301        assert_eq!(
3302            default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3303            "default.db2-se-11.5"
3304        );
3305    }
3306
3307    #[test]
3308    fn license_model_reflects_engine_class() {
3309        assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3310        assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3311        assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3312        assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3313        assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3314    }
3315
3316    #[test]
3317    fn default_db_name_picks_per_engine_default() {
3318        assert_eq!(default_db_name("postgres"), "postgres");
3319        assert_eq!(default_db_name("mysql"), "mysql");
3320        assert_eq!(default_db_name("oracle-ee"), "ORCL");
3321        assert_eq!(default_db_name("sqlserver-ex"), "master");
3322        assert_eq!(default_db_name("db2-se"), "BLUDB");
3323    }
3324
3325    #[test]
3326    fn validate_create_request_accepts_new_engines() {
3327        for (engine, version, port) in [
3328            ("oracle-ee", "23.0.0", 1521),
3329            ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3330            ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3331        ] {
3332            validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3333                .expect("engine should be accepted");
3334        }
3335    }
3336
3337    #[test]
3338    fn validate_create_request_rejects_unsupported_engine_version() {
3339        let err =
3340            validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3341                .expect_err("12.x is not in the supported list");
3342        let msg = format!("{err:?}");
3343        assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3344    }
3345
3346    #[test]
3347    fn filter_engine_versions_matches_requested_engine() {
3348        let versions = default_engine_versions();
3349
3350        let filtered =
3351            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3352
3353        assert_eq!(filtered.len(), 4); // All postgres versions
3354        assert!(filtered.iter().all(|v| v.engine == "postgres"));
3355    }
3356
3357    #[test]
3358    fn filter_orderable_options_respects_instance_class() {
3359        let options = default_orderable_options();
3360
3361        let filtered = filter_orderable_options(
3362            &options,
3363            &Some("postgres".to_string()),
3364            &Some("16.3".to_string()),
3365            &Some("db.t3.micro".to_string()),
3366            &None,
3367            Some(true),
3368        );
3369
3370        assert_eq!(filtered.len(), 1);
3371        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3372    }
3373
3374    #[test]
3375    fn validate_create_request_rejects_unsupported_engine() {
3376        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3377            .expect_err("unsupported engine");
3378
3379        assert_eq!(error.code(), "InvalidParameterValue");
3380    }
3381
3382    #[test]
3383    fn optional_i32_param_rejects_invalid_integer() {
3384        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3385
3386        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3387
3388        assert_eq!(error.code(), "InvalidParameterValue");
3389    }
3390
3391    #[test]
3392    fn db_instance_xml_renders_endpoint_and_status() {
3393        let created_at = Utc::now();
3394        let instance = DbInstance {
3395            db_instance_identifier: "test-db".to_string(),
3396            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3397            db_instance_class: "db.t3.micro".to_string(),
3398            engine: "postgres".to_string(),
3399            engine_version: "16.3".to_string(),
3400            db_instance_status: "available".to_string(),
3401            master_username: "admin".to_string(),
3402            db_name: Some("appdb".to_string()),
3403            endpoint_address: "127.0.0.1".to_string(),
3404            port: 15432,
3405            allocated_storage: 20,
3406            publicly_accessible: true,
3407            deletion_protection: false,
3408            created_at,
3409            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3410            master_user_password: "secret123".to_string(),
3411            container_id: "container".to_string(),
3412            host_port: 15432,
3413            tags: Vec::new(),
3414            read_replica_source_db_instance_identifier: None,
3415            read_replica_db_instance_identifiers: Vec::new(),
3416            vpc_security_group_ids: vec!["sg-12345678".to_string()],
3417            db_parameter_group_name: Some("default.postgres16".to_string()),
3418            backup_retention_period: 1,
3419            preferred_backup_window: "03:00-04:00".to_string(),
3420            latest_restorable_time: Some(created_at),
3421            option_group_name: None,
3422            multi_az: false,
3423            pending_modified_values: None,
3424        };
3425
3426        let xml = db_instance_xml(&instance, Some("creating"));
3427
3428        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3429        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3430        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3431    }
3432
3433    #[test]
3434    fn parse_tags_reads_rds_query_shape() {
3435        let request = request(
3436            "AddTagsToResource",
3437            &[
3438                ("Tags.Tag.1.Key", "env"),
3439                ("Tags.Tag.1.Value", "dev"),
3440                ("Tags.Tag.2.Key", "team"),
3441                ("Tags.Tag.2.Value", "core"),
3442            ],
3443        );
3444
3445        let tags = parse_tags(&request).expect("tags");
3446
3447        assert_eq!(
3448            tags,
3449            vec![
3450                RdsTag {
3451                    key: "env".to_string(),
3452                    value: "dev".to_string(),
3453                },
3454                RdsTag {
3455                    key: "team".to_string(),
3456                    value: "core".to_string(),
3457                }
3458            ]
3459        );
3460    }
3461
3462    #[test]
3463    fn parse_tag_keys_reads_member_shape() {
3464        let request = request(
3465            "RemoveTagsFromResource",
3466            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3467        );
3468
3469        let tag_keys = parse_tag_keys(&request).expect("tag keys");
3470
3471        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3472    }
3473
3474    #[test]
3475    fn merge_tags_updates_existing_values() {
3476        let mut tags = vec![RdsTag {
3477            key: "env".to_string(),
3478            value: "dev".to_string(),
3479        }];
3480
3481        merge_tags(
3482            &mut tags,
3483            &[
3484                RdsTag {
3485                    key: "env".to_string(),
3486                    value: "prod".to_string(),
3487                },
3488                RdsTag {
3489                    key: "team".to_string(),
3490                    value: "core".to_string(),
3491                },
3492            ],
3493        );
3494
3495        assert_eq!(tags.len(), 2);
3496        assert_eq!(tags[0].value, "prod");
3497        assert_eq!(tags[1].key, "team");
3498    }
3499
3500    #[tokio::test]
3501    async fn describe_engine_versions_returns_xml_body() {
3502        let service = RdsService::new(Arc::new(RwLock::new(
3503            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3504        )));
3505        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3506
3507        let response = service.handle(request).await.expect("response");
3508        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3509
3510        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3511        assert!(body.contains("<Engine>postgres</Engine>"));
3512        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3513    }
3514
3515    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3516        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3517        for (key, value) in params {
3518            query_params.insert((*key).to_string(), (*value).to_string());
3519        }
3520
3521        AwsRequest {
3522            service: "rds".to_string(),
3523            action: action.to_string(),
3524            region: "us-east-1".to_string(),
3525            account_id: "123456789012".to_string(),
3526            request_id: "test-request-id".to_string(),
3527            headers: HeaderMap::new(),
3528            query_params,
3529            body: Bytes::new(),
3530            body_stream: parking_lot::Mutex::new(None),
3531            path_segments: vec![],
3532            raw_path: "/".to_string(),
3533            raw_query: String::new(),
3534            method: Method::POST,
3535            is_query_protocol: true,
3536            access_key_id: None,
3537            principal: None,
3538        }
3539    }
3540
3541    // ── Helpers for handler tests ────────────────────────────────────
3542
3543    fn make_service() -> RdsService {
3544        RdsService::new(Arc::new(RwLock::new(
3545            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3546        )))
3547    }
3548
3549    #[derive(Default)]
3550    struct CapturedEvent {
3551        source: String,
3552        detail_type: String,
3553        detail: String,
3554    }
3555
3556    #[derive(Default)]
3557    struct RecordingEb {
3558        events: std::sync::Mutex<Vec<CapturedEvent>>,
3559    }
3560
3561    impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3562        fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3563            self.events.lock().unwrap().push(CapturedEvent {
3564                source: source.to_string(),
3565                detail_type: detail_type.to_string(),
3566                detail: detail.to_string(),
3567            });
3568        }
3569    }
3570
3571    fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3572        let recorder = Arc::new(RecordingEb::default());
3573        let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3574        let svc = RdsService::new(Arc::new(RwLock::new(
3575            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3576        )))
3577        .with_delivery_bus(bus);
3578        (svc, recorder)
3579    }
3580
3581    #[test]
3582    fn emit_event_emits_aws_rds_event_via_bus() {
3583        let (svc, rec) = make_service_with_recorder();
3584        svc.emit_event(
3585            RdsSourceType::DbInstance,
3586            "my-db",
3587            "arn:aws:rds:us-east-1:123456789012:db:my-db",
3588            "RDS-EVENT-0005",
3589            &["creation"],
3590            "DB instance created",
3591        );
3592        let events = rec.events.lock().unwrap();
3593        assert_eq!(events.len(), 1);
3594        let e = &events[0];
3595        assert_eq!(e.source, "aws.rds");
3596        assert_eq!(e.detail_type, "RDS DB Instance Event");
3597        let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3598        assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3599        assert_eq!(detail["SourceType"], "DB_INSTANCE");
3600        assert_eq!(detail["SourceIdentifier"], "my-db");
3601        assert_eq!(detail["Message"], "DB instance created");
3602        assert_eq!(detail["EventCategories"][0], "creation");
3603    }
3604
3605    #[test]
3606    fn emit_event_no_op_without_bus() {
3607        let svc = make_service();
3608        svc.emit_event(
3609            RdsSourceType::DbSnapshot,
3610            "snap",
3611            "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3612            "RDS-EVENT-0042",
3613            &["creation"],
3614            "Manual snapshot created",
3615        );
3616    }
3617
3618    #[test]
3619    fn rds_source_type_detail_type_mapping() {
3620        assert_eq!(
3621            RdsSourceType::DbInstance.detail_type(),
3622            "RDS DB Instance Event"
3623        );
3624        assert_eq!(
3625            RdsSourceType::DbSnapshot.detail_type(),
3626            "RDS DB Snapshot Event"
3627        );
3628        assert_eq!(
3629            RdsSourceType::DbParameterGroup.detail_type(),
3630            "RDS DB Parameter Group Event"
3631        );
3632    }
3633
3634    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3635        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3636    }
3637
3638    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3639        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3640        let mut accounts = svc.state.write();
3641        let state = accounts.default_mut();
3642        state.instances.insert(
3643            identifier.to_string(),
3644            DbInstance {
3645                db_instance_identifier: identifier.to_string(),
3646                db_instance_arn: arn.clone(),
3647                db_instance_class: "db.t3.micro".to_string(),
3648                engine: "postgres".to_string(),
3649                engine_version: "16.3".to_string(),
3650                db_instance_status: "available".to_string(),
3651                master_username: "admin".to_string(),
3652                db_name: Some("appdb".to_string()),
3653                endpoint_address: "127.0.0.1".to_string(),
3654                port: 15432,
3655                allocated_storage: 20,
3656                publicly_accessible: true,
3657                deletion_protection: false,
3658                created_at: Utc::now(),
3659                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3660                master_user_password: "secret".to_string(),
3661                container_id: "container".to_string(),
3662                host_port: 15432,
3663                tags: Vec::new(),
3664                read_replica_source_db_instance_identifier: None,
3665                read_replica_db_instance_identifiers: Vec::new(),
3666                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3667                db_parameter_group_name: Some("default.postgres16".to_string()),
3668                backup_retention_period: 1,
3669                preferred_backup_window: "03:00-04:00".to_string(),
3670                latest_restorable_time: None,
3671                option_group_name: None,
3672                multi_az: false,
3673                pending_modified_values: None,
3674            },
3675        );
3676        arn
3677    }
3678
3679    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3680        match result {
3681            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3682            Err(e) => {
3683                assert_eq!(e.code(), expected_code, "wrong error code");
3684                e
3685            }
3686        }
3687    }
3688
3689    // ── Tag operations ───────────────────────────────────────────────
3690
3691    #[test]
3692    fn add_tags_requires_resource_name() {
3693        let svc = make_service();
3694        let req = request("AddTagsToResource", &[]);
3695        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3696    }
3697
3698    #[test]
3699    fn add_tags_requires_at_least_one_tag() {
3700        let svc = make_service();
3701        let arn = seed_instance(&svc, "db1");
3702        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3703        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3704    }
3705
3706    #[test]
3707    fn add_tags_appends_then_list_tags_returns_them() {
3708        let svc = make_service();
3709        let arn = seed_instance(&svc, "db1");
3710        let add_req = request(
3711            "AddTagsToResource",
3712            &[
3713                ("ResourceName", arn.as_str()),
3714                ("Tags.Tag.1.Key", "env"),
3715                ("Tags.Tag.1.Value", "dev"),
3716            ],
3717        );
3718        svc.add_tags_to_resource(&add_req).unwrap();
3719
3720        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3721        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3722        assert!(body.contains("<Key>env</Key>"));
3723        assert!(body.contains("<Value>dev</Value>"));
3724    }
3725
3726    #[test]
3727    fn list_tags_rejects_filters_param() {
3728        let svc = make_service();
3729        let arn = seed_instance(&svc, "db1");
3730        let req = request(
3731            "ListTagsForResource",
3732            &[
3733                ("ResourceName", arn.as_str()),
3734                ("Filters.Filter.1.Name", "x"),
3735            ],
3736        );
3737        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3738    }
3739
3740    #[test]
3741    fn list_tags_unknown_arn_errors() {
3742        let svc = make_service();
3743        let req = request(
3744            "ListTagsForResource",
3745            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3746        );
3747        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3748    }
3749
3750    #[test]
3751    fn remove_tags_strips_only_listed_keys() {
3752        let svc = make_service();
3753        let arn = seed_instance(&svc, "db1");
3754        {
3755            let mut __a = svc.state.write();
3756            let state = __a.default_mut();
3757            let inst = state.instances.get_mut("db1").unwrap();
3758            inst.tags = vec![
3759                RdsTag {
3760                    key: "env".to_string(),
3761                    value: "dev".to_string(),
3762                },
3763                RdsTag {
3764                    key: "team".to_string(),
3765                    value: "core".to_string(),
3766                },
3767            ];
3768        }
3769        let req = request(
3770            "RemoveTagsFromResource",
3771            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3772        );
3773        svc.remove_tags_from_resource(&req).unwrap();
3774
3775        let __a = svc.state.read();
3776        let state = __a.default_ref();
3777        let tags = &state.instances.get("db1").unwrap().tags;
3778        assert_eq!(tags.len(), 1);
3779        assert_eq!(tags[0].key, "team");
3780    }
3781
3782    #[test]
3783    fn remove_tags_requires_keys() {
3784        let svc = make_service();
3785        let arn = seed_instance(&svc, "db1");
3786        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3787        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3788    }
3789
3790    // ── DB Subnet Groups ─────────────────────────────────────────────
3791
3792    fn create_subnet_group(svc: &RdsService, name: &str) {
3793        let req = request(
3794            "CreateDBSubnetGroup",
3795            &[
3796                ("DBSubnetGroupName", name),
3797                ("DBSubnetGroupDescription", "test"),
3798                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3799                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3800            ],
3801        );
3802        svc.create_db_subnet_group(&req).unwrap();
3803    }
3804
3805    #[test]
3806    fn create_db_subnet_group_requires_two_subnets() {
3807        let svc = make_service();
3808        let req = request(
3809            "CreateDBSubnetGroup",
3810            &[
3811                ("DBSubnetGroupName", "sg1"),
3812                ("DBSubnetGroupDescription", "t"),
3813                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3814            ],
3815        );
3816        assert_code(
3817            svc.create_db_subnet_group(&req),
3818            "DBSubnetGroupDoesNotCoverEnoughAZs",
3819        );
3820    }
3821
3822    #[test]
3823    fn create_db_subnet_group_rejects_empty_subnets() {
3824        let svc = make_service();
3825        let req = request(
3826            "CreateDBSubnetGroup",
3827            &[
3828                ("DBSubnetGroupName", "sg1"),
3829                ("DBSubnetGroupDescription", "t"),
3830            ],
3831        );
3832        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3833    }
3834
3835    #[test]
3836    fn create_db_subnet_group_rejects_duplicates() {
3837        let svc = make_service();
3838        create_subnet_group(&svc, "sg1");
3839        let req = request(
3840            "CreateDBSubnetGroup",
3841            &[
3842                ("DBSubnetGroupName", "sg1"),
3843                ("DBSubnetGroupDescription", "t"),
3844                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3845                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3846            ],
3847        );
3848        assert_code(
3849            svc.create_db_subnet_group(&req),
3850            "DBSubnetGroupAlreadyExists",
3851        );
3852    }
3853
3854    #[test]
3855    fn describe_db_subnet_groups_by_name_or_list() {
3856        let svc = make_service();
3857        create_subnet_group(&svc, "sg-alpha");
3858        create_subnet_group(&svc, "sg-beta");
3859
3860        let by_name = request(
3861            "DescribeDBSubnetGroups",
3862            &[("DBSubnetGroupName", "sg-alpha")],
3863        );
3864        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3865        assert!(body.contains("sg-alpha"));
3866        assert!(!body.contains("sg-beta"));
3867
3868        let list_all = request("DescribeDBSubnetGroups", &[]);
3869        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3870        assert!(body.contains("sg-alpha"));
3871        assert!(body.contains("sg-beta"));
3872    }
3873
3874    #[test]
3875    fn describe_db_subnet_groups_unknown_name_errors() {
3876        let svc = make_service();
3877        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3878        assert_code(
3879            svc.describe_db_subnet_groups(&req),
3880            "DBSubnetGroupNotFoundFault",
3881        );
3882    }
3883
3884    #[test]
3885    fn delete_db_subnet_group_unknown_errors() {
3886        let svc = make_service();
3887        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3888        assert_code(
3889            svc.delete_db_subnet_group(&req),
3890            "DBSubnetGroupNotFoundFault",
3891        );
3892    }
3893
3894    #[test]
3895    fn delete_db_subnet_group_removes_entry() {
3896        let svc = make_service();
3897        create_subnet_group(&svc, "sg1");
3898        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3899        svc.delete_db_subnet_group(&req).unwrap();
3900        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3901    }
3902
3903    #[test]
3904    fn modify_db_subnet_group_updates_subnet_ids() {
3905        let svc = make_service();
3906        create_subnet_group(&svc, "sg1");
3907        let req = request(
3908            "ModifyDBSubnetGroup",
3909            &[
3910                ("DBSubnetGroupName", "sg1"),
3911                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3912                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3913            ],
3914        );
3915        svc.modify_db_subnet_group(&req).unwrap();
3916
3917        let __a = svc.state.read();
3918        let state = __a.default_ref();
3919        let sg = state.subnet_groups.get("sg1").unwrap();
3920        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3921    }
3922
3923    // ── DB Parameter Groups ──────────────────────────────────────────
3924
3925    fn create_param_group(svc: &RdsService, name: &str) {
3926        let req = request(
3927            "CreateDBParameterGroup",
3928            &[
3929                ("DBParameterGroupName", name),
3930                ("DBParameterGroupFamily", "postgres16"),
3931                ("Description", "test"),
3932            ],
3933        );
3934        svc.create_db_parameter_group(&req).unwrap();
3935    }
3936
3937    #[test]
3938    fn create_db_parameter_group_rejects_unknown_family() {
3939        let svc = make_service();
3940        let req = request(
3941            "CreateDBParameterGroup",
3942            &[
3943                ("DBParameterGroupName", "pg1"),
3944                ("DBParameterGroupFamily", "oracle19"),
3945                ("Description", "t"),
3946            ],
3947        );
3948        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3949    }
3950
3951    #[test]
3952    fn create_db_parameter_group_rejects_duplicates() {
3953        let svc = make_service();
3954        create_param_group(&svc, "pg1");
3955        let req = request(
3956            "CreateDBParameterGroup",
3957            &[
3958                ("DBParameterGroupName", "pg1"),
3959                ("DBParameterGroupFamily", "postgres16"),
3960                ("Description", "t"),
3961            ],
3962        );
3963        assert_code(
3964            svc.create_db_parameter_group(&req),
3965            "DBParameterGroupAlreadyExists",
3966        );
3967    }
3968
3969    #[test]
3970    fn describe_db_parameter_groups_by_name_or_list() {
3971        let svc = make_service();
3972        create_param_group(&svc, "pg-alpha");
3973        create_param_group(&svc, "pg-beta");
3974        let by_name = request(
3975            "DescribeDBParameterGroups",
3976            &[("DBParameterGroupName", "pg-alpha")],
3977        );
3978        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3979        assert!(body.contains("pg-alpha"));
3980        assert!(!body.contains("pg-beta"));
3981        let list = request("DescribeDBParameterGroups", &[]);
3982        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3983        assert!(body.contains("pg-alpha"));
3984        assert!(body.contains("pg-beta"));
3985    }
3986
3987    #[test]
3988    fn describe_db_parameter_groups_unknown_name_errors() {
3989        let svc = make_service();
3990        let req = request(
3991            "DescribeDBParameterGroups",
3992            &[("DBParameterGroupName", "ghost")],
3993        );
3994        assert_code(
3995            svc.describe_db_parameter_groups(&req),
3996            "DBParameterGroupNotFound",
3997        );
3998    }
3999
4000    #[test]
4001    fn delete_db_parameter_group_rejects_default_groups() {
4002        let svc = make_service();
4003        let req = request(
4004            "DeleteDBParameterGroup",
4005            &[("DBParameterGroupName", "default.postgres16")],
4006        );
4007        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
4008    }
4009
4010    #[test]
4011    fn delete_db_parameter_group_unknown_errors() {
4012        let svc = make_service();
4013        let req = request(
4014            "DeleteDBParameterGroup",
4015            &[("DBParameterGroupName", "ghost")],
4016        );
4017        assert_code(
4018            svc.delete_db_parameter_group(&req),
4019            "DBParameterGroupNotFound",
4020        );
4021    }
4022
4023    #[test]
4024    fn delete_db_parameter_group_removes_entry() {
4025        let svc = make_service();
4026        create_param_group(&svc, "pg1");
4027        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
4028        svc.delete_db_parameter_group(&req).unwrap();
4029        assert!(!svc
4030            .state
4031            .read()
4032            .default_ref()
4033            .parameter_groups
4034            .contains_key("pg1"));
4035    }
4036
4037    #[test]
4038    fn modify_db_parameter_group_updates_description() {
4039        let svc = make_service();
4040        create_param_group(&svc, "pg1");
4041        let req = request(
4042            "ModifyDBParameterGroup",
4043            &[
4044                ("DBParameterGroupName", "pg1"),
4045                ("Description", "shiny new"),
4046            ],
4047        );
4048        svc.modify_db_parameter_group(&req).unwrap();
4049        let __a = svc.state.read();
4050        let state = __a.default_ref();
4051        assert_eq!(
4052            state.parameter_groups.get("pg1").unwrap().description,
4053            "shiny new"
4054        );
4055    }
4056
4057    #[test]
4058    fn modify_db_parameter_group_unknown_errors() {
4059        let svc = make_service();
4060        let req = request(
4061            "ModifyDBParameterGroup",
4062            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
4063        );
4064        assert_code(
4065            svc.modify_db_parameter_group(&req),
4066            "DBParameterGroupNotFound",
4067        );
4068    }
4069
4070    // ── DescribeDBInstances ──────────────────────────────────────────
4071
4072    #[test]
4073    fn describe_db_instances_by_id_returns_only_one() {
4074        let svc = make_service();
4075        seed_instance(&svc, "db1");
4076        seed_instance(&svc, "db2");
4077        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
4078        let body = body_of(svc.describe_db_instances(&req).unwrap());
4079        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
4080        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
4081    }
4082
4083    #[test]
4084    fn describe_db_instances_unknown_id_errors() {
4085        let svc = make_service();
4086        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4087        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4088    }
4089
4090    #[test]
4091    fn describe_db_instances_lists_all_when_unbounded() {
4092        let svc = make_service();
4093        seed_instance(&svc, "db1");
4094        seed_instance(&svc, "db2");
4095        seed_instance(&svc, "db3");
4096        let req = request("DescribeDBInstances", &[]);
4097        let body = body_of(svc.describe_db_instances(&req).unwrap());
4098        for id in ["db1", "db2", "db3"] {
4099            assert!(body.contains(&format!(
4100                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
4101            )));
4102        }
4103    }
4104
4105    // ── ModifyDBInstance ─────────────────────────────────────────────
4106
4107    #[test]
4108    fn modify_db_instance_requires_at_least_one_change() {
4109        let svc = make_service();
4110        seed_instance(&svc, "db1");
4111        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4112        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4113    }
4114
4115    #[test]
4116    fn modify_db_instance_unknown_errors() {
4117        let svc = make_service();
4118        let req = request(
4119            "ModifyDBInstance",
4120            &[
4121                ("DBInstanceIdentifier", "ghost"),
4122                ("DBInstanceClass", "db.t3.small"),
4123            ],
4124        );
4125        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4126    }
4127
4128    #[test]
4129    fn modify_db_instance_apply_immediately_updates_class() {
4130        let svc = make_service();
4131        seed_instance(&svc, "db1");
4132        let req = request(
4133            "ModifyDBInstance",
4134            &[
4135                ("DBInstanceIdentifier", "db1"),
4136                ("DBInstanceClass", "db.t3.small"),
4137                ("ApplyImmediately", "true"),
4138            ],
4139        );
4140        svc.modify_db_instance(&req).unwrap();
4141        let __a = svc.state.read();
4142        let state = __a.default_ref();
4143        assert_eq!(
4144            state.instances.get("db1").unwrap().db_instance_class,
4145            "db.t3.small"
4146        );
4147    }
4148
4149    #[test]
4150    fn modify_db_instance_pending_when_not_apply_immediately() {
4151        let svc = make_service();
4152        seed_instance(&svc, "db1");
4153        let req = request(
4154            "ModifyDBInstance",
4155            &[
4156                ("DBInstanceIdentifier", "db1"),
4157                ("DBInstanceClass", "db.t3.small"),
4158                ("ApplyImmediately", "false"),
4159            ],
4160        );
4161        svc.modify_db_instance(&req).unwrap();
4162        let __a = svc.state.read();
4163        let state = __a.default_ref();
4164        let inst = state.instances.get("db1").unwrap();
4165        assert_eq!(inst.db_instance_class, "db.t3.micro");
4166        assert_eq!(
4167            inst.pending_modified_values
4168                .as_ref()
4169                .unwrap()
4170                .db_instance_class
4171                .as_deref(),
4172            Some("db.t3.small"),
4173        );
4174    }
4175
4176    // ── Snapshots (sync ops only) ────────────────────────────────────
4177
4178    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4179        let mut __a = svc.state.write();
4180        let state = __a.default_mut();
4181        let arn = state.db_snapshot_arn(snapshot_id);
4182        state.snapshots.insert(
4183            snapshot_id.to_string(),
4184            crate::state::DbSnapshot {
4185                db_snapshot_identifier: snapshot_id.to_string(),
4186                db_snapshot_arn: arn,
4187                db_instance_identifier: instance_id.to_string(),
4188                snapshot_create_time: Utc::now(),
4189                engine: "postgres".to_string(),
4190                engine_version: "16.3".to_string(),
4191                allocated_storage: 20,
4192                status: "available".to_string(),
4193                port: 5432,
4194                master_username: "admin".to_string(),
4195                db_name: Some("appdb".to_string()),
4196                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4197                snapshot_type: "manual".to_string(),
4198                master_user_password: "secret".to_string(),
4199                tags: Vec::new(),
4200                dump_data: Vec::new(),
4201            },
4202        );
4203    }
4204
4205    #[test]
4206    fn delete_db_snapshot_removes_entry() {
4207        let svc = make_service();
4208        seed_snapshot(&svc, "snap1", "db1");
4209        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4210        svc.delete_db_snapshot(&req).unwrap();
4211        assert!(svc.state.read().default_ref().snapshots.is_empty());
4212    }
4213
4214    #[test]
4215    fn delete_db_snapshot_unknown_errors() {
4216        let svc = make_service();
4217        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4218        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4219    }
4220
4221    #[test]
4222    fn describe_db_snapshots_rejects_both_filters() {
4223        let svc = make_service();
4224        let req = request(
4225            "DescribeDBSnapshots",
4226            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4227        );
4228        assert_code(
4229            svc.describe_db_snapshots(&req),
4230            "InvalidParameterCombination",
4231        );
4232    }
4233
4234    #[test]
4235    fn describe_db_snapshots_by_id_or_instance() {
4236        let svc = make_service();
4237        seed_snapshot(&svc, "snap1", "db1");
4238        seed_snapshot(&svc, "snap2", "db2");
4239
4240        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4241        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4242        assert!(body.contains("snap1"));
4243        assert!(!body.contains("snap2"));
4244
4245        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4246        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4247        assert!(body.contains("snap2"));
4248        assert!(!body.contains("snap1"));
4249
4250        let list_all = request("DescribeDBSnapshots", &[]);
4251        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4252        assert!(body.contains("snap1"));
4253        assert!(body.contains("snap2"));
4254    }
4255
4256    #[test]
4257    fn describe_db_snapshots_unknown_id_errors() {
4258        let svc = make_service();
4259        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4260        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4261    }
4262
4263    // ── Error branch tests ──
4264
4265    #[test]
4266    fn describe_db_instances_not_found() {
4267        let svc = make_service();
4268        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4269        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4270    }
4271
4272    #[tokio::test]
4273    async fn delete_db_instance_not_found() {
4274        let svc = make_service();
4275        let req = request(
4276            "DeleteDBInstance",
4277            &[
4278                ("DBInstanceIdentifier", "ghost"),
4279                ("SkipFinalSnapshot", "true"),
4280            ],
4281        );
4282        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4283    }
4284
4285    #[test]
4286    fn modify_db_instance_not_found() {
4287        let svc = make_service();
4288        let req = request(
4289            "ModifyDBInstance",
4290            &[
4291                ("DBInstanceIdentifier", "ghost"),
4292                ("AllocatedStorage", "20"),
4293            ],
4294        );
4295        // Validation fires before existence check
4296        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4297    }
4298
4299    #[tokio::test]
4300    async fn reboot_db_instance_not_found() {
4301        let svc = make_service();
4302        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4303        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4304    }
4305
4306    #[tokio::test]
4307    async fn create_db_snapshot_instance_not_found() {
4308        let svc = make_service();
4309        let req = request(
4310            "CreateDBSnapshot",
4311            &[
4312                ("DBInstanceIdentifier", "ghost"),
4313                ("DBSnapshotIdentifier", "snap1"),
4314            ],
4315        );
4316        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4317    }
4318
4319    #[tokio::test]
4320    async fn restore_db_instance_snapshot_not_found() {
4321        let svc = make_service();
4322        let req = request(
4323            "RestoreDBInstanceFromDBSnapshot",
4324            &[
4325                ("DBInstanceIdentifier", "restored"),
4326                ("DBSnapshotIdentifier", "ghost-snap"),
4327            ],
4328        );
4329        assert_code(
4330            svc.restore_db_instance_from_db_snapshot(&req).await,
4331            "InvalidParameterValue",
4332        );
4333    }
4334
4335    #[tokio::test]
4336    async fn create_db_instance_read_replica_source_not_found() {
4337        let svc = make_service();
4338        let req = request(
4339            "CreateDBInstanceReadReplica",
4340            &[
4341                ("DBInstanceIdentifier", "replica"),
4342                ("SourceDBInstanceIdentifier", "ghost"),
4343            ],
4344        );
4345        assert_code(
4346            svc.create_db_instance_read_replica(&req).await,
4347            "InvalidParameterValue",
4348        );
4349    }
4350
4351    #[test]
4352    fn describe_db_engine_versions_basic() {
4353        let svc = make_service();
4354        let req = request("DescribeDBEngineVersions", &[]);
4355        let resp = svc.describe_db_engine_versions(&req).unwrap();
4356        let body = body_of(resp);
4357        assert!(body.contains("<DBEngineVersions>"));
4358    }
4359
4360    #[test]
4361    fn describe_orderable_db_instance_options_basic() {
4362        let svc = make_service();
4363        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4364        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4365        let body = body_of(resp);
4366        assert!(body.contains("<OrderableDBInstanceOptions>"));
4367    }
4368
4369    #[test]
4370    fn describe_db_parameter_group_not_found() {
4371        let svc = make_service();
4372        let req = request(
4373            "DescribeDBParameterGroups",
4374            &[("DBParameterGroupName", "ghost")],
4375        );
4376        assert_code(
4377            svc.describe_db_parameter_groups(&req),
4378            "DBParameterGroupNotFound",
4379        );
4380    }
4381
4382    #[test]
4383    fn delete_db_parameter_group_not_found() {
4384        let svc = make_service();
4385        let req = request(
4386            "DeleteDBParameterGroup",
4387            &[("DBParameterGroupName", "ghost")],
4388        );
4389        assert_code(
4390            svc.delete_db_parameter_group(&req),
4391            "DBParameterGroupNotFound",
4392        );
4393    }
4394
4395    #[test]
4396    fn describe_db_subnet_group_not_found() {
4397        let svc = make_service();
4398        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4399        assert_code(
4400            svc.describe_db_subnet_groups(&req),
4401            "DBSubnetGroupNotFoundFault",
4402        );
4403    }
4404
4405    #[test]
4406    fn delete_db_subnet_group_not_found() {
4407        let svc = make_service();
4408        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4409        assert_code(
4410            svc.delete_db_subnet_group(&req),
4411            "DBSubnetGroupNotFoundFault",
4412        );
4413    }
4414
4415    #[test]
4416    fn add_tags_resource_not_found() {
4417        let svc = make_service();
4418        let req = request(
4419            "AddTagsToResource",
4420            &[
4421                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4422                ("Tags.member.1.Key", "k"),
4423                ("Tags.member.1.Value", "v"),
4424            ],
4425        );
4426        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4427    }
4428
4429    #[test]
4430    fn list_tags_resource_not_found() {
4431        let svc = make_service();
4432        let req = request(
4433            "ListTagsForResource",
4434            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4435        );
4436        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4437    }
4438
4439    // ── snapshot operations ──
4440
4441    #[tokio::test]
4442    async fn create_db_snapshot_missing_id_errors() {
4443        let svc = make_service();
4444        let req = request(
4445            "CreateDBSnapshot",
4446            &[("DBInstanceIdentifier", "nonexistent")],
4447        );
4448        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4449    }
4450
4451    #[tokio::test]
4452    async fn create_db_snapshot_unknown_instance_errors() {
4453        let svc = make_service();
4454        let req = request(
4455            "CreateDBSnapshot",
4456            &[
4457                ("DBSnapshotIdentifier", "snap1"),
4458                ("DBInstanceIdentifier", "ghost"),
4459            ],
4460        );
4461        assert!(svc.create_db_snapshot(&req).await.is_err());
4462    }
4463
4464    // ── delete_db_instance ──
4465
4466    #[tokio::test]
4467    async fn delete_db_instance_missing_id_errors() {
4468        let svc = make_service();
4469        let req = request("DeleteDBInstance", &[]);
4470        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4471    }
4472
4473    // ── reboot_db_instance ──
4474
4475    #[tokio::test]
4476    async fn reboot_db_instance_missing_id_errors() {
4477        let svc = make_service();
4478        let req = request("RebootDBInstance", &[]);
4479        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4480    }
4481
4482    // ── create_db_instance validation ──
4483
4484    #[tokio::test]
4485    async fn create_db_instance_missing_id_errors() {
4486        let svc = make_service();
4487        let req = request(
4488            "CreateDBInstance",
4489            &[
4490                ("Engine", "postgres"),
4491                ("DBInstanceClass", "db.t3.micro"),
4492                ("AllocatedStorage", "20"),
4493                ("MasterUsername", "admin"),
4494                ("MasterUserPassword", "secretpass"),
4495            ],
4496        );
4497        assert!(svc.create_db_instance(&req).await.is_err());
4498    }
4499
4500    #[tokio::test]
4501    async fn create_db_instance_unsupported_engine_errors() {
4502        let svc = make_service();
4503        let req = request(
4504            "CreateDBInstance",
4505            &[
4506                ("DBInstanceIdentifier", "db1"),
4507                ("Engine", "mongodb"),
4508                ("DBInstanceClass", "db.t3.micro"),
4509                ("AllocatedStorage", "20"),
4510                ("MasterUsername", "admin"),
4511                ("MasterUserPassword", "secretpass"),
4512            ],
4513        );
4514        assert!(svc.create_db_instance(&req).await.is_err());
4515    }
4516
4517    // ── restore_db_instance_from_db_snapshot ──
4518
4519    #[tokio::test]
4520    async fn restore_db_instance_missing_ids_errors() {
4521        let svc = make_service();
4522        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4523        assert!(svc
4524            .restore_db_instance_from_db_snapshot(&req)
4525            .await
4526            .is_err());
4527    }
4528
4529    #[tokio::test]
4530    async fn restore_db_instance_unknown_snapshot_errors() {
4531        let svc = make_service();
4532        let req = request(
4533            "RestoreDBInstanceFromDBSnapshot",
4534            &[
4535                ("DBInstanceIdentifier", "restored"),
4536                ("DBSnapshotIdentifier", "missing"),
4537            ],
4538        );
4539        assert!(svc
4540            .restore_db_instance_from_db_snapshot(&req)
4541            .await
4542            .is_err());
4543    }
4544
4545    // ── create_db_instance_read_replica ──
4546
4547    #[tokio::test]
4548    async fn create_read_replica_missing_source_errors() {
4549        let svc = make_service();
4550        let req = request(
4551            "CreateDBInstanceReadReplica",
4552            &[("DBInstanceIdentifier", "replica1")],
4553        );
4554        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4555    }
4556
4557    #[tokio::test]
4558    async fn create_read_replica_unknown_source_errors() {
4559        let svc = make_service();
4560        let req = request(
4561            "CreateDBInstanceReadReplica",
4562            &[
4563                ("DBInstanceIdentifier", "replica1"),
4564                ("SourceDBInstanceIdentifier", "ghost"),
4565            ],
4566        );
4567        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4568    }
4569
4570    // ── describe_db_snapshots with filters ──
4571
4572    #[test]
4573    fn describe_db_snapshots_by_snapshot_id_only() {
4574        let svc = make_service();
4575        seed_snapshot(&svc, "s1", "inst1");
4576        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4577        let resp = svc.describe_db_snapshots(&req).unwrap();
4578        let b = body_of(resp);
4579        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4580    }
4581
4582    #[test]
4583    fn describe_db_snapshots_by_instance_id_returns_matching() {
4584        let svc = make_service();
4585        seed_snapshot(&svc, "s1", "inst1");
4586        seed_snapshot(&svc, "s2", "inst2");
4587        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4588        let resp = svc.describe_db_snapshots(&req).unwrap();
4589        let b = body_of(resp);
4590        assert!(b.contains("s1"));
4591        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4592    }
4593
4594    // ── modify_db_parameter_group ──
4595
4596    #[test]
4597    fn modify_db_parameter_group_missing_name() {
4598        let svc = make_service();
4599        let req = request("ModifyDBParameterGroup", &[]);
4600        assert!(svc.modify_db_parameter_group(&req).is_err());
4601    }
4602
4603    // ── modify_db_subnet_group ──
4604
4605    #[test]
4606    fn modify_db_subnet_group_unknown_errors() {
4607        let svc = make_service();
4608        let req = request(
4609            "ModifyDBSubnetGroup",
4610            &[
4611                ("DBSubnetGroupName", "ghost"),
4612                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4613                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4614            ],
4615        );
4616        assert!(svc.modify_db_subnet_group(&req).is_err());
4617    }
4618
4619    // ── describe_db_instances ──
4620
4621    #[test]
4622    fn describe_db_instances_empty_returns_xml() {
4623        let svc = make_service();
4624        let req = request("DescribeDBInstances", &[]);
4625        let resp = svc.describe_db_instances(&req).unwrap();
4626        let b = body_of(resp);
4627        assert!(b.contains("DescribeDBInstancesResult"));
4628    }
4629
4630    #[test]
4631    fn describe_db_snapshots_empty_returns_empty_list() {
4632        let svc = make_service();
4633        let req = request("DescribeDBSnapshots", &[]);
4634        let resp = svc.describe_db_snapshots(&req).unwrap();
4635        let b = body_of(resp);
4636        assert!(b.contains("DescribeDBSnapshotsResult"));
4637    }
4638
4639    #[test]
4640    fn add_tags_unknown_resource_errors() {
4641        let svc = make_service();
4642        let req = request(
4643            "AddTagsToResource",
4644            &[
4645                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4646                ("Tags.member.1.Key", "k"),
4647                ("Tags.member.1.Value", "v"),
4648            ],
4649        );
4650        assert!(svc.add_tags_to_resource(&req).is_err());
4651    }
4652
4653    #[test]
4654    fn remove_tags_unknown_resource_errors() {
4655        let svc = make_service();
4656        let req = request(
4657            "RemoveTagsFromResource",
4658            &[
4659                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4660                ("TagKeys.member.1", "k"),
4661            ],
4662        );
4663        assert!(svc.remove_tags_from_resource(&req).is_err());
4664    }
4665
4666    #[test]
4667    fn create_db_parameter_group_missing_name_errors() {
4668        let svc = make_service();
4669        let req = request(
4670            "CreateDBParameterGroup",
4671            &[
4672                ("DBParameterGroupFamily", "postgres16"),
4673                ("Description", "d"),
4674            ],
4675        );
4676        assert!(svc.create_db_parameter_group(&req).is_err());
4677    }
4678
4679    #[test]
4680    fn create_db_subnet_group_missing_desc_errors() {
4681        let svc = make_service();
4682        let req = request(
4683            "CreateDBSubnetGroup",
4684            &[
4685                ("DBSubnetGroupName", "sg1"),
4686                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4687                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4688            ],
4689        );
4690        assert!(svc.create_db_subnet_group(&req).is_err());
4691    }
4692
4693    #[tokio::test]
4694    async fn create_db_instance_missing_class_errors() {
4695        let svc = make_service();
4696        let req = request(
4697            "CreateDBInstance",
4698            &[
4699                ("DBInstanceIdentifier", "miss-class"),
4700                ("Engine", "postgres"),
4701                ("AllocatedStorage", "20"),
4702                ("MasterUsername", "admin"),
4703                ("MasterUserPassword", "secretpass"),
4704            ],
4705        );
4706        assert!(svc.create_db_instance(&req).await.is_err());
4707    }
4708
4709    #[tokio::test]
4710    async fn create_db_instance_missing_master_username_errors() {
4711        let svc = make_service();
4712        let req = request(
4713            "CreateDBInstance",
4714            &[
4715                ("DBInstanceIdentifier", "miss-mu"),
4716                ("Engine", "postgres"),
4717                ("DBInstanceClass", "db.t3.micro"),
4718                ("AllocatedStorage", "20"),
4719                ("MasterUserPassword", "secretpass"),
4720            ],
4721        );
4722        assert!(svc.create_db_instance(&req).await.is_err());
4723    }
4724
4725    #[test]
4726    fn modify_db_instance_missing_id_errors() {
4727        let svc = make_service();
4728        let req = request("ModifyDBInstance", &[]);
4729        assert!(svc.modify_db_instance(&req).is_err());
4730    }
4731
4732    #[test]
4733    fn modify_db_parameter_group_unknown_pg_errors() {
4734        let svc = make_service();
4735        let req = request(
4736            "ModifyDBParameterGroup",
4737            &[
4738                ("DBParameterGroupName", "ghost"),
4739                ("Parameters.member.1.ParameterName", "p"),
4740                ("Parameters.member.1.ParameterValue", "v"),
4741                ("Parameters.member.1.ApplyMethod", "immediate"),
4742            ],
4743        );
4744        assert!(svc.modify_db_parameter_group(&req).is_err());
4745    }
4746
4747    #[test]
4748    fn describe_db_parameter_groups_unknown_errors() {
4749        let svc = make_service();
4750        let req = request(
4751            "DescribeDBParameterGroups",
4752            &[("DBParameterGroupName", "ghost")],
4753        );
4754        assert!(svc.describe_db_parameter_groups(&req).is_err());
4755    }
4756
4757    #[test]
4758    fn describe_db_subnet_groups_unknown_errors() {
4759        let svc = make_service();
4760        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4761        assert!(svc.describe_db_subnet_groups(&req).is_err());
4762    }
4763}