Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::runtime::{RdsRuntime, RuntimeError};
16use crate::state::{
17    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
18    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
19    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
23
24fn is_mutating_action(action: &str) -> bool {
25    if matches!(
26        action,
27        "AddTagsToResource"
28            | "CreateDBInstance"
29            | "CreateDBInstanceReadReplica"
30            | "CreateDBParameterGroup"
31            | "CreateDBSnapshot"
32            | "CreateDBSubnetGroup"
33            | "DeleteDBInstance"
34            | "DeleteDBParameterGroup"
35            | "DeleteDBSnapshot"
36            | "DeleteDBSubnetGroup"
37            | "ModifyDBInstance"
38            | "ModifyDBParameterGroup"
39            | "ModifyDBSubnetGroup"
40            | "RebootDBInstance"
41            | "RemoveTagsFromResource"
42            | "RestoreDBInstanceFromDBSnapshot"
43    ) {
44        return true;
45    }
46    // Heuristic for the 140 extra ops: any verb that mutates state.
47    let mutating_prefixes = [
48        "Create",
49        "Modify",
50        "Delete",
51        "Reboot",
52        "Start",
53        "Stop",
54        "Failover",
55        "Switchover",
56        "Promote",
57        "Reset",
58        "Apply",
59        "Authorize",
60        "Revoke",
61        "Add",
62        "Remove",
63        "Register",
64        "Deregister",
65        "Copy",
66        "Restore",
67        "Backtrack",
68        "Cancel",
69        "Purchase",
70        "Disable",
71        "Enable",
72    ];
73    mutating_prefixes.iter().any(|p| action.starts_with(p))
74}
75const SUPPORTED_ACTIONS: &[&str] = &[
76    "AddRoleToDBCluster",
77    "AddRoleToDBInstance",
78    "AddSourceIdentifierToSubscription",
79    "AddTagsToResource",
80    "ApplyPendingMaintenanceAction",
81    "AuthorizeDBSecurityGroupIngress",
82    "BacktrackDBCluster",
83    "CancelExportTask",
84    "CopyDBClusterParameterGroup",
85    "CopyDBClusterSnapshot",
86    "CopyDBParameterGroup",
87    "CopyDBSnapshot",
88    "CopyOptionGroup",
89    "CreateBlueGreenDeployment",
90    "CreateCustomDBEngineVersion",
91    "CreateDBCluster",
92    "CreateDBClusterEndpoint",
93    "CreateDBClusterParameterGroup",
94    "CreateDBClusterSnapshot",
95    "CreateDBInstance",
96    "CreateDBInstanceReadReplica",
97    "CreateDBParameterGroup",
98    "CreateDBProxy",
99    "CreateDBProxyEndpoint",
100    "CreateDBSecurityGroup",
101    "CreateDBShardGroup",
102    "CreateDBSnapshot",
103    "CreateDBSubnetGroup",
104    "CreateEventSubscription",
105    "CreateGlobalCluster",
106    "CreateIntegration",
107    "CreateOptionGroup",
108    "CreateTenantDatabase",
109    "DeleteBlueGreenDeployment",
110    "DeleteCustomDBEngineVersion",
111    "DeleteDBCluster",
112    "DeleteDBClusterAutomatedBackup",
113    "DeleteDBClusterEndpoint",
114    "DeleteDBClusterParameterGroup",
115    "DeleteDBClusterSnapshot",
116    "DeleteDBInstance",
117    "DeleteDBInstanceAutomatedBackup",
118    "DeleteDBParameterGroup",
119    "DeleteDBProxy",
120    "DeleteDBProxyEndpoint",
121    "DeleteDBSecurityGroup",
122    "DeleteDBShardGroup",
123    "DeleteDBSnapshot",
124    "DeleteDBSubnetGroup",
125    "DeleteEventSubscription",
126    "DeleteGlobalCluster",
127    "DeleteIntegration",
128    "DeleteOptionGroup",
129    "DeleteTenantDatabase",
130    "DeregisterDBProxyTargets",
131    "DescribeAccountAttributes",
132    "DescribeBlueGreenDeployments",
133    "DescribeCertificates",
134    "DescribeDBClusterAutomatedBackups",
135    "DescribeDBClusterBacktracks",
136    "DescribeDBClusterEndpoints",
137    "DescribeDBClusterParameterGroups",
138    "DescribeDBClusterParameters",
139    "DescribeDBClusterSnapshotAttributes",
140    "DescribeDBClusterSnapshots",
141    "DescribeDBClusters",
142    "DescribeDBEngineVersions",
143    "DescribeDBInstanceAutomatedBackups",
144    "DescribeDBInstances",
145    "DescribeDBLogFiles",
146    "DescribeDBMajorEngineVersions",
147    "DescribeDBParameterGroups",
148    "DescribeDBParameters",
149    "DescribeDBProxies",
150    "DescribeDBProxyEndpoints",
151    "DescribeDBProxyTargetGroups",
152    "DescribeDBProxyTargets",
153    "DescribeDBRecommendations",
154    "DescribeDBSecurityGroups",
155    "DescribeDBShardGroups",
156    "DescribeDBSnapshotAttributes",
157    "DescribeDBSnapshotTenantDatabases",
158    "DescribeDBSnapshots",
159    "DescribeDBSubnetGroups",
160    "DescribeEngineDefaultClusterParameters",
161    "DescribeEngineDefaultParameters",
162    "DescribeEventCategories",
163    "DescribeEventSubscriptions",
164    "DescribeEvents",
165    "DescribeExportTasks",
166    "DescribeGlobalClusters",
167    "DescribeIntegrations",
168    "DescribeOptionGroupOptions",
169    "DescribeOptionGroups",
170    "DescribeOrderableDBInstanceOptions",
171    "DescribePendingMaintenanceActions",
172    "DescribeReservedDBInstances",
173    "DescribeReservedDBInstancesOfferings",
174    "DescribeSourceRegions",
175    "DescribeTenantDatabases",
176    "DescribeValidDBInstanceModifications",
177    "DisableHttpEndpoint",
178    "DownloadDBLogFilePortion",
179    "EnableHttpEndpoint",
180    "FailoverDBCluster",
181    "FailoverGlobalCluster",
182    "ListTagsForResource",
183    "ModifyActivityStream",
184    "ModifyCertificates",
185    "ModifyCurrentDBClusterCapacity",
186    "ModifyCustomDBEngineVersion",
187    "ModifyDBCluster",
188    "ModifyDBClusterEndpoint",
189    "ModifyDBClusterParameterGroup",
190    "ModifyDBClusterSnapshotAttribute",
191    "ModifyDBInstance",
192    "ModifyDBParameterGroup",
193    "ModifyDBProxy",
194    "ModifyDBProxyEndpoint",
195    "ModifyDBProxyTargetGroup",
196    "ModifyDBRecommendation",
197    "ModifyDBShardGroup",
198    "ModifyDBSnapshot",
199    "ModifyDBSnapshotAttribute",
200    "ModifyDBSubnetGroup",
201    "ModifyEventSubscription",
202    "ModifyGlobalCluster",
203    "ModifyIntegration",
204    "ModifyOptionGroup",
205    "ModifyTenantDatabase",
206    "PromoteReadReplica",
207    "PromoteReadReplicaDBCluster",
208    "PurchaseReservedDBInstancesOffering",
209    "RebootDBCluster",
210    "RebootDBInstance",
211    "RebootDBShardGroup",
212    "RegisterDBProxyTargets",
213    "RemoveFromGlobalCluster",
214    "RemoveRoleFromDBCluster",
215    "RemoveRoleFromDBInstance",
216    "RemoveSourceIdentifierFromSubscription",
217    "RemoveTagsFromResource",
218    "ResetDBClusterParameterGroup",
219    "ResetDBParameterGroup",
220    "RestoreDBClusterFromS3",
221    "RestoreDBClusterFromSnapshot",
222    "RestoreDBClusterToPointInTime",
223    "RestoreDBInstanceFromDBSnapshot",
224    "RestoreDBInstanceFromS3",
225    "RestoreDBInstanceToPointInTime",
226    "RevokeDBSecurityGroupIngress",
227    "StartActivityStream",
228    "StartDBCluster",
229    "StartDBInstance",
230    "StartDBInstanceAutomatedBackupsReplication",
231    "StartExportTask",
232    "StopActivityStream",
233    "StopDBCluster",
234    "StopDBInstance",
235    "StopDBInstanceAutomatedBackupsReplication",
236    "SwitchoverBlueGreenDeployment",
237    "SwitchoverGlobalCluster",
238    "SwitchoverReadReplica",
239];
240
241pub struct RdsService {
242    pub(crate) state: SharedRdsState,
243    runtime: Option<Arc<RdsRuntime>>,
244    snapshot_store: Option<Arc<dyn SnapshotStore>>,
245    snapshot_lock: Arc<AsyncMutex<()>>,
246    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
247}
248
249/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
250#[derive(Clone, Copy)]
251#[allow(dead_code, clippy::enum_variant_names)]
252pub(crate) enum RdsSourceType {
253    DbInstance,
254    DbSnapshot,
255    DbParameterGroup,
256}
257
258impl RdsSourceType {
259    fn as_str(self) -> &'static str {
260        match self {
261            Self::DbInstance => "DB_INSTANCE",
262            Self::DbSnapshot => "DB_SNAPSHOT",
263            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
264        }
265    }
266
267    fn detail_type(self) -> &'static str {
268        match self {
269            Self::DbInstance => "RDS DB Instance Event",
270            Self::DbSnapshot => "RDS DB Snapshot Event",
271            Self::DbParameterGroup => "RDS DB Parameter Group Event",
272        }
273    }
274}
275
276impl RdsService {
277    pub(crate) fn state_handle(&self) -> &SharedRdsState {
278        &self.state
279    }
280}
281
282impl RdsService {
283    pub fn new(state: SharedRdsState) -> Self {
284        Self {
285            state,
286            runtime: None,
287            snapshot_store: None,
288            snapshot_lock: Arc::new(AsyncMutex::new(())),
289            delivery_bus: None,
290        }
291    }
292
293    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
294        self.runtime = Some(runtime);
295        self
296    }
297
298    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
299        self.snapshot_store = Some(store);
300        self
301    }
302
303    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
304        self.delivery_bus = Some(bus);
305        self
306    }
307
308    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
309    /// No-op when the delivery bus isn't wired (tests, minimal configs).
310    pub(crate) fn emit_event(
311        &self,
312        source_type: RdsSourceType,
313        source_identifier: &str,
314        source_arn: &str,
315        event_id: &str,
316        event_categories: &[&str],
317        message: &str,
318    ) {
319        let Some(ref bus) = self.delivery_bus else {
320            return;
321        };
322        let detail = serde_json::json!({
323            "EventCategories": event_categories,
324            "SourceType": source_type.as_str(),
325            "SourceArn": source_arn,
326            "Date": Utc::now().to_rfc3339(),
327            "Message": message,
328            "SourceIdentifier": source_identifier,
329            "EventID": event_id,
330        });
331        bus.put_event_to_eventbridge(
332            "aws.rds",
333            source_type.detail_type(),
334            &detail.to_string(),
335            "default",
336        );
337    }
338
339    async fn save_snapshot(&self) {
340        let Some(store) = self.snapshot_store.clone() else {
341            return;
342        };
343        let _guard = self.snapshot_lock.lock().await;
344        let snapshot = RdsSnapshot {
345            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
346            state: None,
347            accounts: Some(self.state.read().clone()),
348        };
349        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
350            let bytes = serde_json::to_vec(&snapshot)
351                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
352            store.save(&bytes)
353        })
354        .await;
355        match join {
356            Ok(Ok(())) => {}
357            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
358            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
359        }
360    }
361
362    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
363    ///
364    /// RDS operations that start, stop, or reach into a database container fail
365    /// with a consistent wire error when the daemon (Docker/Podman) is missing
366    /// rather than each caller restating the message.
367    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
368        self.runtime.as_ref().ok_or_else(|| {
369            AwsServiceError::aws_error(
370                StatusCode::SERVICE_UNAVAILABLE,
371                "InvalidParameterValue",
372                "Docker/Podman is required for RDS DB instances but is not available",
373            )
374        })
375    }
376}
377
378#[async_trait]
379impl AwsService for RdsService {
380    fn service_name(&self) -> &str {
381        "rds"
382    }
383
384    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let mutates = is_mutating_action(request.action.as_str());
386        let result = match request.action.as_str() {
387            "AddTagsToResource" => self.add_tags_to_resource(&request),
388            "CreateDBInstance" => self.create_db_instance(&request).await,
389            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
390            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
391            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
392            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
393            "DeleteDBInstance" => self.delete_db_instance(&request).await,
394            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
395            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
396            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
397            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
398            "DescribeDBInstances" => self.describe_db_instances(&request),
399            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
400            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
401            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
402            "DescribeOrderableDBInstanceOptions" => {
403                self.describe_orderable_db_instance_options(&request)
404            }
405            "ListTagsForResource" => self.list_tags_for_resource(&request),
406            "ModifyDBInstance" => self.modify_db_instance(&request),
407            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
408            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
409            "RebootDBInstance" => self.reboot_db_instance(&request).await,
410            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
411            "RestoreDBInstanceFromDBSnapshot" => {
412                self.restore_db_instance_from_db_snapshot(&request).await
413            }
414            _ => self.handle_extra_action(&request),
415        };
416        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
417            self.save_snapshot().await;
418        }
419        result
420    }
421
422    fn supported_actions(&self) -> &[&str] {
423        SUPPORTED_ACTIONS
424    }
425}
426
427impl RdsService {
428    async fn create_db_instance(
429        &self,
430        request: &AwsRequest,
431    ) -> Result<AwsResponse, AwsServiceError> {
432        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
433        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
434        let db_instance_class = required_param(request, "DBInstanceClass")?;
435        let engine = required_param(request, "Engine")?;
436        let master_username = required_param(request, "MasterUsername")?;
437        let master_user_password = required_param(request, "MasterUserPassword")?;
438        let db_name = optional_param(request, "DBName");
439        let engine_version =
440            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
441        let publicly_accessible =
442            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
443                .unwrap_or(true);
444        let deletion_protection =
445            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
446                .unwrap_or(false);
447        let port = optional_i32_param(request, "Port")?
448            .unwrap_or_else(|| default_port_for_engine(&engine));
449        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
450
451        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
452            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
453
454        let backup_retention_period =
455            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
456        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
457            .unwrap_or_else(|| "03:00-04:00".to_string());
458        let option_group_name = optional_param(request, "OptionGroupName");
459        let multi_az =
460            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
461
462        validate_create_request(
463            &db_instance_identifier,
464            allocated_storage,
465            &db_instance_class,
466            &engine,
467            &engine_version,
468            port,
469        )?;
470
471        {
472            let mut accounts = self.state.write();
473            let state = accounts.get_or_create(&request.account_id);
474            if !state.begin_instance_creation(&db_instance_identifier) {
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::BAD_REQUEST,
477                    "DBInstanceAlreadyExists",
478                    format!("DBInstance {} already exists.", db_instance_identifier),
479                ));
480            }
481            // Validate parameter group exists if specified by the caller
482            if let Some(ref pg_name) = db_parameter_group_name {
483                if !state.parameter_groups.contains_key(pg_name) {
484                    state.cancel_instance_creation(&db_instance_identifier);
485                    return Err(AwsServiceError::aws_error(
486                        StatusCode::NOT_FOUND,
487                        "DBParameterGroupNotFound",
488                        format!("DBParameterGroup {} not found.", pg_name),
489                    ));
490                }
491            }
492        }
493
494        let runtime = self.require_runtime()?;
495
496        let logical_db_name = db_name
497            .clone()
498            .unwrap_or_else(|| default_db_name(&engine).to_string());
499        let running = runtime
500            .ensure_postgres(
501                &db_instance_identifier,
502                &engine,
503                &engine_version,
504                &master_username,
505                &master_user_password,
506                &logical_db_name,
507            )
508            .await
509            .map_err(|error| {
510                self.state
511                    .write()
512                    .get_or_create(&request.account_id)
513                    .cancel_instance_creation(&db_instance_identifier);
514                runtime_error_to_service_error(error)
515            })?;
516
517        let mut accounts = self.state.write();
518        let state = accounts.get_or_create(&request.account_id);
519        let created_at = Utc::now();
520        let instance = DbInstance {
521            db_instance_identifier: db_instance_identifier.clone(),
522            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
523            db_instance_class: db_instance_class.clone(),
524            engine: engine.clone(),
525            engine_version: engine_version.clone(),
526            db_instance_status: "available".to_string(),
527            master_username: master_username.clone(),
528            db_name: db_name.clone(),
529            endpoint_address: "127.0.0.1".to_string(),
530            port: i32::from(running.host_port),
531            allocated_storage,
532            publicly_accessible,
533            deletion_protection,
534            created_at,
535            dbi_resource_id: state.next_dbi_resource_id(),
536            master_user_password,
537            container_id: running.container_id,
538            host_port: running.host_port,
539            tags: Vec::new(),
540            read_replica_source_db_instance_identifier: None,
541            read_replica_db_instance_identifiers: Vec::new(),
542            vpc_security_group_ids,
543            db_parameter_group_name,
544            backup_retention_period,
545            preferred_backup_window,
546            latest_restorable_time: if backup_retention_period > 0 {
547                Some(created_at)
548            } else {
549                None
550            },
551            option_group_name,
552            multi_az,
553            pending_modified_values: None,
554        };
555        state.finish_instance_creation(instance.clone());
556        let instance_arn = instance.db_instance_arn.clone();
557        drop(accounts);
558
559        self.emit_event(
560            RdsSourceType::DbInstance,
561            &db_instance_identifier,
562            &instance_arn,
563            "RDS-EVENT-0005",
564            &["creation"],
565            "DB instance created",
566        );
567
568        Ok(AwsResponse::xml(
569            StatusCode::OK,
570            xml_wrap(
571                "CreateDBInstance",
572                &format!(
573                    "<DBInstance>{}</DBInstance>",
574                    db_instance_xml(&instance, Some("creating"))
575                ),
576                &request.request_id,
577            ),
578        ))
579    }
580
581    async fn delete_db_instance(
582        &self,
583        request: &AwsRequest,
584    ) -> Result<AwsResponse, AwsServiceError> {
585        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
586        let skip_final_snapshot =
587            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
588                .unwrap_or(false);
589        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
590
591        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
592            return Err(AwsServiceError::aws_error(
593                StatusCode::BAD_REQUEST,
594                "InvalidParameterCombination",
595                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
596            ));
597        }
598        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
599            return Err(AwsServiceError::aws_error(
600                StatusCode::BAD_REQUEST,
601                "InvalidParameterCombination",
602                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
603            ));
604        }
605
606        // Check deletion protection BEFORE creating snapshot or making any changes
607        {
608            let accounts = self.state.read();
609            let empty = RdsState::new(&request.account_id, &request.region);
610            let state = accounts.get(&request.account_id).unwrap_or(&empty);
611            if let Some(instance) = state.instances.get(&db_instance_identifier) {
612                if instance.deletion_protection {
613                    return Err(AwsServiceError::aws_error(
614                        StatusCode::BAD_REQUEST,
615                        "InvalidDBInstanceState",
616                        format!(
617                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
618                            db_instance_identifier
619                        ),
620                    ));
621                }
622            } else {
623                return Err(db_instance_not_found(&db_instance_identifier));
624            }
625        }
626
627        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
628            self.create_final_db_snapshot(
629                &db_instance_identifier,
630                snapshot_id,
631                &request.account_id,
632                &request.region,
633            )
634            .await?;
635        }
636
637        let instance = {
638            let mut accounts = self.state.write();
639            let state = accounts.get_or_create(&request.account_id);
640            let instance = state
641                .instances
642                .remove(&db_instance_identifier)
643                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
644
645            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
646                if let Some(source) = state.instances.get_mut(source_id) {
647                    source
648                        .read_replica_db_instance_identifiers
649                        .retain(|id| id != &db_instance_identifier);
650                }
651            }
652
653            for replica_id in &instance.read_replica_db_instance_identifiers {
654                if let Some(replica) = state.instances.get_mut(replica_id) {
655                    replica.read_replica_source_db_instance_identifier = None;
656                }
657            }
658
659            instance
660        };
661
662        if let Some(runtime) = &self.runtime {
663            runtime.stop_container(&db_instance_identifier).await;
664        }
665
666        self.emit_event(
667            RdsSourceType::DbInstance,
668            &db_instance_identifier,
669            &instance.db_instance_arn,
670            "RDS-EVENT-0003",
671            &["deletion"],
672            "DB instance deleted",
673        );
674
675        Ok(AwsResponse::xml(
676            StatusCode::OK,
677            xml_wrap(
678                "DeleteDBInstance",
679                &format!(
680                    "<DBInstance>{}</DBInstance>",
681                    db_instance_xml(&instance, Some("deleting"))
682                ),
683                &request.request_id,
684            ),
685        ))
686    }
687
688    /// Take a final snapshot of an instance that is about to be deleted,
689    /// persisting the dumped database into `state.snapshots`. The DLQ-style
690    /// conflict check runs twice — once under the read lock before paying
691    /// for the dump, once under the write lock before committing — to keep
692    /// concurrent deletes from colliding.
693    async fn create_final_db_snapshot(
694        &self,
695        db_instance_identifier: &str,
696        snapshot_id: &str,
697        account_id: &str,
698        region: &str,
699    ) -> Result<(), AwsServiceError> {
700        let runtime = self.runtime.as_ref().ok_or_else(|| {
701            AwsServiceError::aws_error(
702                StatusCode::SERVICE_UNAVAILABLE,
703                "InvalidParameterValue",
704                "Docker/Podman is required for RDS snapshots but is not available",
705            )
706        })?;
707
708        let (instance_for_snapshot, db_name) = {
709            let accounts = self.state.read();
710            let empty = RdsState::new(account_id, region);
711            let state = accounts.get(account_id).unwrap_or(&empty);
712
713            if state.snapshots.contains_key(snapshot_id) {
714                return Err(AwsServiceError::aws_error(
715                    StatusCode::CONFLICT,
716                    "DBSnapshotAlreadyExists",
717                    format!("DBSnapshot {snapshot_id} already exists."),
718                ));
719            }
720
721            let instance = state
722                .instances
723                .get(db_instance_identifier)
724                .cloned()
725                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
726
727            let default_db = default_db_name(&instance.engine);
728            let db_name = instance
729                .db_name
730                .as_deref()
731                .unwrap_or(default_db)
732                .to_string();
733
734            (instance, db_name)
735        };
736
737        let dump_data = runtime
738            .dump_database(
739                db_instance_identifier,
740                &instance_for_snapshot.engine,
741                &instance_for_snapshot.master_username,
742                &instance_for_snapshot.master_user_password,
743                &db_name,
744            )
745            .await
746            .map_err(runtime_error_to_service_error)?;
747
748        let mut accounts = self.state.write();
749        let state = accounts.get_or_create(account_id);
750
751        if state.snapshots.contains_key(snapshot_id) {
752            return Err(AwsServiceError::aws_error(
753                StatusCode::CONFLICT,
754                "DBSnapshotAlreadyExists",
755                format!("DBSnapshot {snapshot_id} already exists."),
756            ));
757        }
758
759        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
760
761        let snapshot = DbSnapshot {
762            db_snapshot_identifier: snapshot_id.to_string(),
763            db_snapshot_arn: snapshot_arn,
764            db_instance_identifier: db_instance_identifier.to_string(),
765            snapshot_create_time: Utc::now(),
766            engine: instance_for_snapshot.engine.clone(),
767            engine_version: instance_for_snapshot.engine_version.clone(),
768            allocated_storage: instance_for_snapshot.allocated_storage,
769            status: "available".to_string(),
770            port: instance_for_snapshot.port,
771            master_username: instance_for_snapshot.master_username.clone(),
772            db_name: instance_for_snapshot.db_name.clone(),
773            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
774            snapshot_type: "manual".to_string(),
775            master_user_password: instance_for_snapshot.master_user_password.clone(),
776            tags: Vec::new(),
777            dump_data,
778        };
779
780        state.snapshots.insert(snapshot_id.to_string(), snapshot);
781        Ok(())
782    }
783
784    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
786        let db_instance_class = optional_param(request, "DBInstanceClass");
787        let deletion_protection =
788            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
789        let apply_immediately =
790            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
791
792        // Parse VPC security group IDs - only if at least one is provided
793        let vpc_security_group_ids = {
794            let mut ids = Vec::new();
795            for index in 1.. {
796                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
797                match optional_param(request, &sg_id_name) {
798                    Some(sg_id) => ids.push(sg_id),
799                    None => break,
800                }
801            }
802            if ids.is_empty() {
803                None
804            } else {
805                Some(ids)
806            }
807        };
808
809        if db_instance_class.is_none()
810            && deletion_protection.is_none()
811            && vpc_security_group_ids.is_none()
812        {
813            return Err(AwsServiceError::aws_error(
814                StatusCode::BAD_REQUEST,
815                "InvalidParameterCombination",
816                "At least one supported mutable field must be provided.",
817            ));
818        }
819        if let Some(ref class) = db_instance_class {
820            validate_db_instance_class(class)?;
821        }
822
823        let mut accounts = self.state.write();
824        let state = accounts.get_or_create(&request.account_id);
825        let instance = state
826            .instances
827            .get_mut(&db_instance_identifier)
828            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
829
830        // If ApplyImmediately is false, stage changes as pending
831        if apply_immediately == Some(false) {
832            let pending = instance
833                .pending_modified_values
834                .get_or_insert(Default::default());
835            if let Some(class) = db_instance_class {
836                pending.db_instance_class = Some(class);
837            }
838            // Note: deletion_protection and vpc_security_group_ids are applied immediately
839            // regardless of ApplyImmediately flag (per AWS behavior)
840            if let Some(deletion_protection) = deletion_protection {
841                instance.deletion_protection = deletion_protection;
842            }
843            if let Some(security_group_ids) = vpc_security_group_ids {
844                instance.vpc_security_group_ids = security_group_ids;
845            }
846        } else {
847            // Apply immediately (default behavior)
848            if let Some(class) = db_instance_class {
849                instance.db_instance_class = class;
850            }
851            if let Some(deletion_protection) = deletion_protection {
852                instance.deletion_protection = deletion_protection;
853            }
854            if let Some(security_group_ids) = vpc_security_group_ids {
855                instance.vpc_security_group_ids = security_group_ids;
856            }
857        }
858        let instance_arn = instance.db_instance_arn.clone();
859        let xml = xml_wrap(
860            "ModifyDBInstance",
861            &format!(
862                "<DBInstance>{}</DBInstance>",
863                db_instance_xml(instance, Some("modifying"))
864            ),
865            &request.request_id,
866        );
867        drop(accounts);
868
869        self.emit_event(
870            RdsSourceType::DbInstance,
871            &db_instance_identifier,
872            &instance_arn,
873            "RDS-EVENT-0014",
874            &["configuration change"],
875            "DB instance was modified",
876        );
877
878        Ok(AwsResponse::xml(StatusCode::OK, xml))
879    }
880
881    async fn reboot_db_instance(
882        &self,
883        request: &AwsRequest,
884    ) -> Result<AwsResponse, AwsServiceError> {
885        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
886        let force_failover =
887            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
888        if force_failover == Some(true) {
889            return Err(AwsServiceError::aws_error(
890                StatusCode::BAD_REQUEST,
891                "InvalidParameterCombination",
892                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
893            ));
894        }
895
896        let instance = {
897            let accounts = self.state.read();
898            let empty = RdsState::new(&request.account_id, &request.region);
899            let state = accounts.get(&request.account_id).unwrap_or(&empty);
900            state
901                .instances
902                .get(&db_instance_identifier)
903                .cloned()
904                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
905        };
906
907        let runtime = self.require_runtime()?;
908
909        let running = runtime
910            .restart_container(
911                &db_instance_identifier,
912                &instance.engine,
913                &instance.master_username,
914                &instance.master_user_password,
915                instance
916                    .db_name
917                    .as_deref()
918                    .unwrap_or(default_db_name(&instance.engine)),
919            )
920            .await
921            .map_err(runtime_error_to_service_error)?;
922
923        let instance = {
924            let mut accounts = self.state.write();
925            let state = accounts.get_or_create(&request.account_id);
926            let instance = state
927                .instances
928                .get_mut(&db_instance_identifier)
929                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
930            instance.host_port = running.host_port;
931            instance.port = i32::from(running.host_port);
932
933            // Apply any pending modifications
934            if let Some(pending) = instance.pending_modified_values.take() {
935                if let Some(class) = pending.db_instance_class {
936                    instance.db_instance_class = class;
937                }
938                if let Some(allocated_storage) = pending.allocated_storage {
939                    instance.allocated_storage = allocated_storage;
940                }
941                if let Some(backup_retention_period) = pending.backup_retention_period {
942                    instance.backup_retention_period = backup_retention_period;
943                }
944                if let Some(multi_az) = pending.multi_az {
945                    instance.multi_az = multi_az;
946                }
947                if let Some(engine_version) = pending.engine_version {
948                    instance.engine_version = engine_version;
949                }
950                if let Some(master_user_password) = pending.master_user_password {
951                    instance.master_user_password = master_user_password;
952                }
953            }
954
955            instance.clone()
956        };
957
958        self.emit_event(
959            RdsSourceType::DbInstance,
960            &db_instance_identifier,
961            &instance.db_instance_arn,
962            "RDS-EVENT-0006",
963            &["availability"],
964            "DB instance restarted",
965        );
966
967        Ok(AwsResponse::xml(
968            StatusCode::OK,
969            xml_wrap(
970                "RebootDBInstance",
971                &format!(
972                    "<DBInstance>{}</DBInstance>",
973                    db_instance_xml(&instance, Some("rebooting"))
974                ),
975                &request.request_id,
976            ),
977        ))
978    }
979
980    fn describe_db_engine_versions(
981        &self,
982        request: &AwsRequest,
983    ) -> Result<AwsResponse, AwsServiceError> {
984        let engine = optional_param(request, "Engine");
985        let engine_version = optional_param(request, "EngineVersion");
986        let family = optional_param(request, "DBParameterGroupFamily");
987        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
988
989        let mut versions = filter_engine_versions(
990            &default_engine_versions(),
991            &engine,
992            &engine_version,
993            &family,
994        );
995
996        if default_only.unwrap_or(false) {
997            versions.truncate(1);
998        }
999
1000        Ok(AwsResponse::xml(
1001            StatusCode::OK,
1002            xml_wrap(
1003                "DescribeDBEngineVersions",
1004                &format!(
1005                    "<DBEngineVersions>{}</DBEngineVersions>",
1006                    versions.iter().map(engine_version_xml).collect::<String>()
1007                ),
1008                &request.request_id,
1009            ),
1010        ))
1011    }
1012
1013    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1014        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1015        let marker = optional_param(request, "Marker");
1016        let max_records = optional_param(request, "MaxRecords");
1017
1018        let accounts = self.state.read();
1019        let empty = RdsState::new(&request.account_id, &request.region);
1020        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1021
1022        // If specific identifier requested, return just that one (no pagination)
1023        if let Some(identifier) = db_instance_identifier {
1024            let instance = state
1025                .instances
1026                .get(&identifier)
1027                .cloned()
1028                .ok_or_else(|| db_instance_not_found(&identifier))?;
1029
1030            return Ok(AwsResponse::xml(
1031                StatusCode::OK,
1032                xml_wrap(
1033                    "DescribeDBInstances",
1034                    &format!(
1035                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1036                        db_instance_xml(&instance, None)
1037                    ),
1038                    &request.request_id,
1039                ),
1040            ));
1041        }
1042
1043        // Get all instances sorted by created_at, then identifier
1044        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1045        instances.sort_by(|a, b| {
1046            a.created_at
1047                .cmp(&b.created_at)
1048                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1049        });
1050
1051        // Apply pagination
1052        let paginated = paginate(instances, marker, max_records, |inst| {
1053            &inst.db_instance_identifier
1054        })?;
1055
1056        let marker_xml = paginated
1057            .next_marker
1058            .as_ref()
1059            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1060            .unwrap_or_default();
1061
1062        Ok(AwsResponse::xml(
1063            StatusCode::OK,
1064            xml_wrap(
1065                "DescribeDBInstances",
1066                &format!(
1067                    "<DBInstances>{}</DBInstances>{}",
1068                    paginated
1069                        .items
1070                        .iter()
1071                        .map(|instance| {
1072                            format!(
1073                                "<DBInstance>{}</DBInstance>",
1074                                db_instance_xml(instance, None)
1075                            )
1076                        })
1077                        .collect::<String>(),
1078                    marker_xml
1079                ),
1080                &request.request_id,
1081            ),
1082        ))
1083    }
1084
1085    fn describe_orderable_db_instance_options(
1086        &self,
1087        request: &AwsRequest,
1088    ) -> Result<AwsResponse, AwsServiceError> {
1089        let engine = optional_param(request, "Engine");
1090        let engine_version = optional_param(request, "EngineVersion");
1091        let db_instance_class = optional_param(request, "DBInstanceClass");
1092        let license_model = optional_param(request, "LicenseModel");
1093        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1094
1095        let options = filter_orderable_options(
1096            &default_orderable_options(),
1097            &engine,
1098            &engine_version,
1099            &db_instance_class,
1100            &license_model,
1101            vpc,
1102        );
1103
1104        Ok(AwsResponse::xml(
1105            StatusCode::OK,
1106            xml_wrap(
1107                "DescribeOrderableDBInstanceOptions",
1108                &format!(
1109                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1110                    options.iter().map(orderable_option_xml).collect::<String>()
1111                ),
1112                &request.request_id,
1113            ),
1114        ))
1115    }
1116
1117    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1118        let resource_name = required_param(request, "ResourceName")?;
1119        let tags = parse_tags(request)?;
1120
1121        if tags.is_empty() {
1122            return Err(AwsServiceError::aws_error(
1123                StatusCode::BAD_REQUEST,
1124                "MissingParameter",
1125                "The request must contain the parameter Tags.",
1126            ));
1127        }
1128
1129        let mut accounts = self.state.write();
1130        let state = accounts.get_or_create(&request.account_id);
1131        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1132        merge_tags(&mut instance.tags, &tags);
1133
1134        Ok(AwsResponse::xml(
1135            StatusCode::OK,
1136            xml_wrap("AddTagsToResource", "", &request.request_id),
1137        ))
1138    }
1139
1140    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1141        let resource_name = required_param(request, "ResourceName")?;
1142        if query_param_prefix_exists(request, "Filters.") {
1143            return Err(AwsServiceError::aws_error(
1144                StatusCode::BAD_REQUEST,
1145                "InvalidParameterValue",
1146                "Filters are not yet supported for ListTagsForResource.",
1147            ));
1148        }
1149
1150        let accounts = self.state.read();
1151        let empty = RdsState::new(&request.account_id, &request.region);
1152        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1153        let instance = find_instance_by_arn(state, &resource_name)?;
1154        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1155
1156        Ok(AwsResponse::xml(
1157            StatusCode::OK,
1158            xml_wrap(
1159                "ListTagsForResource",
1160                &format!("<TagList>{tag_xml}</TagList>"),
1161                &request.request_id,
1162            ),
1163        ))
1164    }
1165
1166    fn remove_tags_from_resource(
1167        &self,
1168        request: &AwsRequest,
1169    ) -> Result<AwsResponse, AwsServiceError> {
1170        let resource_name = required_param(request, "ResourceName")?;
1171        let tag_keys = parse_tag_keys(request)?;
1172
1173        if tag_keys.is_empty() {
1174            return Err(AwsServiceError::aws_error(
1175                StatusCode::BAD_REQUEST,
1176                "MissingParameter",
1177                "The request must contain the parameter TagKeys.",
1178            ));
1179        }
1180
1181        let mut accounts = self.state.write();
1182        let state = accounts.get_or_create(&request.account_id);
1183        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1184        instance
1185            .tags
1186            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1187
1188        Ok(AwsResponse::xml(
1189            StatusCode::OK,
1190            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
1191        ))
1192    }
1193
1194    async fn create_db_snapshot(
1195        &self,
1196        request: &AwsRequest,
1197    ) -> Result<AwsResponse, AwsServiceError> {
1198        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1199        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1200
1201        let runtime = self.runtime.as_ref().ok_or_else(|| {
1202            AwsServiceError::aws_error(
1203                StatusCode::SERVICE_UNAVAILABLE,
1204                "InvalidParameterValue",
1205                "Docker/Podman is required for RDS snapshots but is not available",
1206            )
1207        })?;
1208
1209        let (instance, db_name) = {
1210            let accounts = self.state.read();
1211            let empty = RdsState::new(&request.account_id, &request.region);
1212            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1213
1214            if state.snapshots.contains_key(&db_snapshot_identifier) {
1215                return Err(AwsServiceError::aws_error(
1216                    StatusCode::CONFLICT,
1217                    "DBSnapshotAlreadyExists",
1218                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1219                ));
1220            }
1221
1222            let instance = state
1223                .instances
1224                .get(&db_instance_identifier)
1225                .cloned()
1226                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1227
1228            let default_db = default_db_name(&instance.engine);
1229            let db_name = instance
1230                .db_name
1231                .as_deref()
1232                .unwrap_or(default_db)
1233                .to_string();
1234
1235            (instance, db_name)
1236        };
1237
1238        let dump_data = runtime
1239            .dump_database(
1240                &db_instance_identifier,
1241                &instance.engine,
1242                &instance.master_username,
1243                &instance.master_user_password,
1244                &db_name,
1245            )
1246            .await
1247            .map_err(runtime_error_to_service_error)?;
1248
1249        let mut accounts = self.state.write();
1250        let state = accounts.get_or_create(&request.account_id);
1251
1252        if state.snapshots.contains_key(&db_snapshot_identifier) {
1253            return Err(AwsServiceError::aws_error(
1254                StatusCode::CONFLICT,
1255                "DBSnapshotAlreadyExists",
1256                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1257            ));
1258        }
1259
1260        let snapshot = DbSnapshot {
1261            db_snapshot_identifier: db_snapshot_identifier.clone(),
1262            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1263            db_instance_identifier: instance.db_instance_identifier.clone(),
1264            snapshot_create_time: Utc::now(),
1265            engine: instance.engine.clone(),
1266            engine_version: instance.engine_version.clone(),
1267            allocated_storage: instance.allocated_storage,
1268            status: "available".to_string(),
1269            port: instance.port,
1270            master_username: instance.master_username.clone(),
1271            db_name: instance.db_name.clone(),
1272            dbi_resource_id: instance.dbi_resource_id.clone(),
1273            snapshot_type: "manual".to_string(),
1274            master_user_password: instance.master_user_password.clone(),
1275            tags: Vec::new(),
1276            dump_data,
1277        };
1278
1279        state
1280            .snapshots
1281            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1282        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1283        drop(accounts);
1284
1285        self.emit_event(
1286            RdsSourceType::DbSnapshot,
1287            &db_snapshot_identifier,
1288            &snapshot_arn,
1289            "RDS-EVENT-0042",
1290            &["creation"],
1291            "Manual snapshot created",
1292        );
1293
1294        Ok(AwsResponse::xml(
1295            StatusCode::OK,
1296            xml_wrap(
1297                "CreateDBSnapshot",
1298                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1299                &request.request_id,
1300            ),
1301        ))
1302    }
1303
1304    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1305        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1306        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1307        let marker = optional_param(request, "Marker");
1308        let max_records = optional_param(request, "MaxRecords");
1309
1310        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1311            return Err(AwsServiceError::aws_error(
1312                StatusCode::BAD_REQUEST,
1313                "InvalidParameterCombination",
1314                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1315            ));
1316        }
1317
1318        let accounts = self.state.read();
1319        let empty = RdsState::new(&request.account_id, &request.region);
1320        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1321
1322        // If specific snapshot requested, return just that one (no pagination)
1323        if let Some(snapshot_id) = db_snapshot_identifier {
1324            let snapshot = state
1325                .snapshots
1326                .get(&snapshot_id)
1327                .cloned()
1328                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1329
1330            return Ok(AwsResponse::xml(
1331                StatusCode::OK,
1332                xml_wrap(
1333                    "DescribeDBSnapshots",
1334                    &format!(
1335                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1336                        db_snapshot_xml(&snapshot)
1337                    ),
1338                    &request.request_id,
1339                ),
1340            ));
1341        }
1342
1343        // Get snapshots, filtered by instance identifier if provided
1344        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1345            state
1346                .snapshots
1347                .values()
1348                .filter(|s| s.db_instance_identifier == instance_id)
1349                .cloned()
1350                .collect()
1351        } else {
1352            state.snapshots.values().cloned().collect()
1353        };
1354
1355        // Sort by creation time, then identifier
1356        snapshots.sort_by(|a, b| {
1357            a.snapshot_create_time
1358                .cmp(&b.snapshot_create_time)
1359                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1360        });
1361
1362        // Apply pagination
1363        let paginated = paginate(snapshots, marker, max_records, |snap| {
1364            &snap.db_snapshot_identifier
1365        })?;
1366
1367        let marker_xml = paginated
1368            .next_marker
1369            .as_ref()
1370            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1371            .unwrap_or_default();
1372
1373        Ok(AwsResponse::xml(
1374            StatusCode::OK,
1375            xml_wrap(
1376                "DescribeDBSnapshots",
1377                &format!(
1378                    "<DBSnapshots>{}</DBSnapshots>{}",
1379                    paginated
1380                        .items
1381                        .iter()
1382                        .map(|snapshot| format!(
1383                            "<DBSnapshot>{}</DBSnapshot>",
1384                            db_snapshot_xml(snapshot)
1385                        ))
1386                        .collect::<String>(),
1387                    marker_xml
1388                ),
1389                &request.request_id,
1390            ),
1391        ))
1392    }
1393
1394    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1395        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1396
1397        let mut accounts = self.state.write();
1398        let state = accounts.get_or_create(&request.account_id);
1399
1400        let snapshot = state
1401            .snapshots
1402            .remove(&db_snapshot_identifier)
1403            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1404        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1405        drop(accounts);
1406
1407        self.emit_event(
1408            RdsSourceType::DbSnapshot,
1409            &db_snapshot_identifier,
1410            &snapshot_arn,
1411            "RDS-EVENT-0041",
1412            &["deletion"],
1413            "Manual snapshot deleted",
1414        );
1415
1416        Ok(AwsResponse::xml(
1417            StatusCode::OK,
1418            xml_wrap(
1419                "DeleteDBSnapshot",
1420                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1421                &request.request_id,
1422            ),
1423        ))
1424    }
1425
1426    async fn restore_db_instance_from_db_snapshot(
1427        &self,
1428        request: &AwsRequest,
1429    ) -> Result<AwsResponse, AwsServiceError> {
1430        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1431        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1432        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1433
1434        let runtime = self.require_runtime()?;
1435
1436        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1437            let mut accounts = self.state.write();
1438            let state = accounts.get_or_create(&request.account_id);
1439
1440            if !state.begin_instance_creation(&db_instance_identifier) {
1441                return Err(AwsServiceError::aws_error(
1442                    StatusCode::CONFLICT,
1443                    "DBInstanceAlreadyExists",
1444                    format!("DBInstance {db_instance_identifier} already exists."),
1445                ));
1446            }
1447
1448            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1449                Some(s) => s,
1450                None => {
1451                    state.cancel_instance_creation(&db_instance_identifier);
1452                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1453                }
1454            };
1455
1456            let dbi_resource_id = state.next_dbi_resource_id();
1457            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1458            let created_at = Utc::now();
1459
1460            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1461        };
1462
1463        let db_name = snapshot
1464            .db_name
1465            .as_deref()
1466            .unwrap_or(default_db_name(&snapshot.engine));
1467        let running = match runtime
1468            .ensure_postgres(
1469                &db_instance_identifier,
1470                &snapshot.engine,
1471                &snapshot.engine_version,
1472                &snapshot.master_username,
1473                &snapshot.master_user_password,
1474                db_name,
1475            )
1476            .await
1477        {
1478            Ok(running) => running,
1479            Err(e) => {
1480                self.state
1481                    .write()
1482                    .get_or_create(&request.account_id)
1483                    .cancel_instance_creation(&db_instance_identifier);
1484                return Err(runtime_error_to_service_error(e));
1485            }
1486        };
1487
1488        if let Err(e) = runtime
1489            .restore_database(
1490                &db_instance_identifier,
1491                &snapshot.engine,
1492                &snapshot.master_username,
1493                &snapshot.master_user_password,
1494                db_name,
1495                &snapshot.dump_data,
1496            )
1497            .await
1498        {
1499            self.state
1500                .write()
1501                .get_or_create(&request.account_id)
1502                .cancel_instance_creation(&db_instance_identifier);
1503            runtime.stop_container(&db_instance_identifier).await;
1504            return Err(runtime_error_to_service_error(e));
1505        }
1506
1507        let instance = build_restored_instance(
1508            &db_instance_identifier,
1509            db_instance_arn,
1510            dbi_resource_id,
1511            created_at,
1512            vpc_security_group_ids,
1513            &snapshot,
1514            &running,
1515        );
1516
1517        self.state
1518            .write()
1519            .get_or_create(&request.account_id)
1520            .finish_instance_creation(instance.clone());
1521
1522        self.emit_event(
1523            RdsSourceType::DbInstance,
1524            &db_instance_identifier,
1525            &instance.db_instance_arn,
1526            "RDS-EVENT-0043",
1527            &["creation"],
1528            "DB instance restored from snapshot",
1529        );
1530
1531        Ok(AwsResponse::xml(
1532            StatusCode::OK,
1533            xml_wrap(
1534                "RestoreDBInstanceFromDBSnapshot",
1535                &format!(
1536                    "<DBInstance>{}</DBInstance>",
1537                    db_instance_xml(&instance, None)
1538                ),
1539                &request.request_id,
1540            ),
1541        ))
1542    }
1543
1544    async fn create_db_instance_read_replica(
1545        &self,
1546        request: &AwsRequest,
1547    ) -> Result<AwsResponse, AwsServiceError> {
1548        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1549        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1550
1551        let runtime = self.runtime.as_ref().ok_or_else(|| {
1552            AwsServiceError::aws_error(
1553                StatusCode::SERVICE_UNAVAILABLE,
1554                "InvalidParameterValue",
1555                "Docker/Podman is required for RDS read replicas but is not available",
1556            )
1557        })?;
1558
1559        let (source_instance, db_name) = {
1560            let mut accounts = self.state.write();
1561            let state = accounts.get_or_create(&request.account_id);
1562
1563            if !state.begin_instance_creation(&db_instance_identifier) {
1564                return Err(AwsServiceError::aws_error(
1565                    StatusCode::CONFLICT,
1566                    "DBInstanceAlreadyExists",
1567                    format!("DBInstance {db_instance_identifier} already exists."),
1568                ));
1569            }
1570
1571            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1572            {
1573                Some(inst) => inst,
1574                None => {
1575                    state.cancel_instance_creation(&db_instance_identifier);
1576                    return Err(db_instance_not_found(&source_db_instance_identifier));
1577                }
1578            };
1579
1580            let default_db = default_db_name(&source_instance.engine);
1581            let db_name = source_instance
1582                .db_name
1583                .as_deref()
1584                .unwrap_or(default_db)
1585                .to_string();
1586
1587            (source_instance, db_name)
1588        };
1589
1590        let dump_data = match runtime
1591            .dump_database(
1592                &source_db_instance_identifier,
1593                &source_instance.engine,
1594                &source_instance.master_username,
1595                &source_instance.master_user_password,
1596                &db_name,
1597            )
1598            .await
1599        {
1600            Ok(data) => data,
1601            Err(e) => {
1602                self.state
1603                    .write()
1604                    .get_or_create(&request.account_id)
1605                    .cancel_instance_creation(&db_instance_identifier);
1606                return Err(runtime_error_to_service_error(e));
1607            }
1608        };
1609
1610        let (dbi_resource_id, db_instance_arn) = {
1611            let accounts = self.state.read();
1612            let empty = RdsState::new(&request.account_id, &request.region);
1613            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1614            (
1615                s.next_dbi_resource_id(),
1616                s.db_instance_arn(&db_instance_identifier),
1617            )
1618        };
1619        let created_at = Utc::now();
1620
1621        let running = match runtime
1622            .ensure_postgres(
1623                &db_instance_identifier,
1624                &source_instance.engine,
1625                &source_instance.engine_version,
1626                &source_instance.master_username,
1627                &source_instance.master_user_password,
1628                &db_name,
1629            )
1630            .await
1631        {
1632            Ok(running) => running,
1633            Err(e) => {
1634                self.state
1635                    .write()
1636                    .get_or_create(&request.account_id)
1637                    .cancel_instance_creation(&db_instance_identifier);
1638                return Err(runtime_error_to_service_error(e));
1639            }
1640        };
1641
1642        if let Err(e) = runtime
1643            .restore_database(
1644                &db_instance_identifier,
1645                &source_instance.engine,
1646                &source_instance.master_username,
1647                &source_instance.master_user_password,
1648                &db_name,
1649                &dump_data,
1650            )
1651            .await
1652        {
1653            self.state
1654                .write()
1655                .get_or_create(&request.account_id)
1656                .cancel_instance_creation(&db_instance_identifier);
1657            runtime.stop_container(&db_instance_identifier).await;
1658            return Err(runtime_error_to_service_error(e));
1659        }
1660
1661        let replica = build_read_replica_instance(
1662            &db_instance_identifier,
1663            db_instance_arn,
1664            dbi_resource_id,
1665            created_at,
1666            &source_db_instance_identifier,
1667            &source_instance,
1668            &running,
1669        );
1670
1671        let source_missing = {
1672            let mut accounts = self.state.write();
1673            let state = accounts.get_or_create(&request.account_id);
1674            match state.instances.get_mut(&source_db_instance_identifier) {
1675                Some(source) => {
1676                    source
1677                        .read_replica_db_instance_identifiers
1678                        .push(db_instance_identifier.clone());
1679                    state.finish_instance_creation(replica.clone());
1680                    false
1681                }
1682                None => {
1683                    state.cancel_instance_creation(&db_instance_identifier);
1684                    true
1685                }
1686            }
1687        };
1688
1689        if source_missing {
1690            runtime.stop_container(&db_instance_identifier).await;
1691            return Err(db_instance_not_found(&source_db_instance_identifier));
1692        }
1693
1694        self.emit_event(
1695            RdsSourceType::DbInstance,
1696            &db_instance_identifier,
1697            &replica.db_instance_arn,
1698            "RDS-EVENT-0005",
1699            &["creation", "read replica"],
1700            "Read replica DB instance created",
1701        );
1702
1703        Ok(AwsResponse::xml(
1704            StatusCode::OK,
1705            xml_wrap(
1706                "CreateDBInstanceReadReplica",
1707                &format!(
1708                    "<DBInstance>{}</DBInstance>",
1709                    db_instance_xml(&replica, None)
1710                ),
1711                &request.request_id,
1712            ),
1713        ))
1714    }
1715
1716    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1717        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1718        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1719        let subnet_ids = parse_subnet_ids(request)?;
1720
1721        if subnet_ids.is_empty() {
1722            return Err(AwsServiceError::aws_error(
1723                StatusCode::BAD_REQUEST,
1724                "InvalidParameterValue",
1725                "At least one subnet must be specified.",
1726            ));
1727        }
1728
1729        if subnet_ids.len() < 2 {
1730            return Err(AwsServiceError::aws_error(
1731                StatusCode::BAD_REQUEST,
1732                "DBSubnetGroupDoesNotCoverEnoughAZs",
1733                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1734            ));
1735        }
1736
1737        let mut accounts = self.state.write();
1738        let state = accounts.get_or_create(&request.account_id);
1739
1740        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1741            return Err(AwsServiceError::aws_error(
1742                StatusCode::CONFLICT,
1743                "DBSubnetGroupAlreadyExists",
1744                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1745            ));
1746        }
1747
1748        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1749        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1750            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1751            .collect();
1752
1753        // Validate that subnets span at least 2 unique Availability Zones
1754        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1755        if unique_azs.len() < 2 {
1756            return Err(AwsServiceError::aws_error(
1757                StatusCode::BAD_REQUEST,
1758                "DBSubnetGroupDoesNotCoverEnoughAZs",
1759                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1760            ));
1761        }
1762
1763        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1764        let tags = parse_tags(request)?;
1765
1766        let subnet_group = DbSubnetGroup {
1767            db_subnet_group_name: db_subnet_group_name.clone(),
1768            db_subnet_group_arn,
1769            db_subnet_group_description,
1770            vpc_id,
1771            subnet_ids,
1772            subnet_availability_zones,
1773            tags,
1774        };
1775
1776        state
1777            .subnet_groups
1778            .insert(db_subnet_group_name, subnet_group.clone());
1779
1780        Ok(AwsResponse::xml(
1781            StatusCode::OK,
1782            xml_wrap(
1783                "CreateDBSubnetGroup",
1784                &format!(
1785                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1786                    db_subnet_group_xml(&subnet_group)
1787                ),
1788                &request.request_id,
1789            ),
1790        ))
1791    }
1792
1793    fn describe_db_subnet_groups(
1794        &self,
1795        request: &AwsRequest,
1796    ) -> Result<AwsResponse, AwsServiceError> {
1797        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1798        let marker = optional_param(request, "Marker");
1799        let max_records = optional_param(request, "MaxRecords");
1800
1801        let accounts = self.state.read();
1802        let empty = RdsState::new(&request.account_id, &request.region);
1803        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1804
1805        // If specific subnet group requested, return just that one (no pagination)
1806        if let Some(name) = db_subnet_group_name {
1807            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1808                AwsServiceError::aws_error(
1809                    StatusCode::NOT_FOUND,
1810                    "DBSubnetGroupNotFoundFault",
1811                    format!("DBSubnetGroup {} not found.", name),
1812                )
1813            })?;
1814
1815            return Ok(AwsResponse::xml(
1816                StatusCode::OK,
1817                xml_wrap(
1818                    "DescribeDBSubnetGroups",
1819                    &format!(
1820                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1821                        db_subnet_group_xml(sg)
1822                    ),
1823                    &request.request_id,
1824                ),
1825            ));
1826        }
1827
1828        // Get all subnet groups sorted by name
1829        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1830        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1831
1832        // Apply pagination
1833        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1834            &sg.db_subnet_group_name
1835        })?;
1836
1837        let marker_xml = paginated
1838            .next_marker
1839            .as_ref()
1840            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1841            .unwrap_or_default();
1842
1843        let body = paginated
1844            .items
1845            .iter()
1846            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1847            .collect::<Vec<_>>()
1848            .join("");
1849
1850        Ok(AwsResponse::xml(
1851            StatusCode::OK,
1852            xml_wrap(
1853                "DescribeDBSubnetGroups",
1854                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1855                &request.request_id,
1856            ),
1857        ))
1858    }
1859
1860    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1861        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1862
1863        let mut accounts = self.state.write();
1864        let state = accounts.get_or_create(&request.account_id);
1865
1866        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1867            return Err(AwsServiceError::aws_error(
1868                StatusCode::NOT_FOUND,
1869                "DBSubnetGroupNotFoundFault",
1870                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1871            ));
1872        }
1873
1874        Ok(AwsResponse::xml(
1875            StatusCode::OK,
1876            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1877        ))
1878    }
1879
1880    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1881        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1882        let subnet_ids = parse_subnet_ids(request)?;
1883
1884        if subnet_ids.is_empty() {
1885            return Err(AwsServiceError::aws_error(
1886                StatusCode::BAD_REQUEST,
1887                "InvalidParameterValue",
1888                "At least one subnet must be specified.",
1889            ));
1890        }
1891
1892        if subnet_ids.len() < 2 {
1893            return Err(AwsServiceError::aws_error(
1894                StatusCode::BAD_REQUEST,
1895                "DBSubnetGroupDoesNotCoverEnoughAZs",
1896                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1897            ));
1898        }
1899
1900        let mut accounts = self.state.write();
1901        let state = accounts.get_or_create(&request.account_id);
1902
1903        let region = state.region.clone();
1904
1905        let subnet_group = state
1906            .subnet_groups
1907            .get_mut(&db_subnet_group_name)
1908            .ok_or_else(|| {
1909                AwsServiceError::aws_error(
1910                    StatusCode::NOT_FOUND,
1911                    "DBSubnetGroupNotFoundFault",
1912                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1913                )
1914            })?;
1915
1916        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1917            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1918            .collect();
1919
1920        // Validate that subnets span at least 2 unique Availability Zones
1921        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1922        if unique_azs.len() < 2 {
1923            return Err(AwsServiceError::aws_error(
1924                StatusCode::BAD_REQUEST,
1925                "DBSubnetGroupDoesNotCoverEnoughAZs",
1926                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1927            ));
1928        }
1929
1930        subnet_group.subnet_ids = subnet_ids;
1931        subnet_group.subnet_availability_zones = subnet_availability_zones;
1932
1933        let subnet_group_clone = subnet_group.clone();
1934
1935        Ok(AwsResponse::xml(
1936            StatusCode::OK,
1937            xml_wrap(
1938                "ModifyDBSubnetGroup",
1939                &format!(
1940                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1941                    db_subnet_group_xml(&subnet_group_clone)
1942                ),
1943                &request.request_id,
1944            ),
1945        ))
1946    }
1947
1948    fn create_db_parameter_group(
1949        &self,
1950        request: &AwsRequest,
1951    ) -> Result<AwsResponse, AwsServiceError> {
1952        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1953        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1954        let description = required_param(request, "Description")?;
1955
1956        // Validate parameter group family against supported engines and versions
1957        let valid_families = [
1958            "postgres16",
1959            "postgres15",
1960            "postgres14",
1961            "postgres13",
1962            "mysql8.0",
1963            "mysql5.7",
1964            "mariadb10.11",
1965            "mariadb10.6",
1966        ];
1967
1968        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1969            return Err(AwsServiceError::aws_error(
1970                StatusCode::BAD_REQUEST,
1971                "InvalidParameterValue",
1972                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1973            ));
1974        }
1975
1976        let mut accounts = self.state.write();
1977        let state = accounts.get_or_create(&request.account_id);
1978
1979        if state
1980            .parameter_groups
1981            .contains_key(&db_parameter_group_name)
1982        {
1983            return Err(AwsServiceError::aws_error(
1984                StatusCode::CONFLICT,
1985                "DBParameterGroupAlreadyExists",
1986                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1987            ));
1988        }
1989
1990        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1991        let tags = parse_tags(request)?;
1992
1993        let parameter_group = DbParameterGroup {
1994            db_parameter_group_name: db_parameter_group_name.clone(),
1995            db_parameter_group_arn,
1996            db_parameter_group_family,
1997            description,
1998            parameters: std::collections::HashMap::new(),
1999            tags,
2000        };
2001
2002        state
2003            .parameter_groups
2004            .insert(db_parameter_group_name, parameter_group.clone());
2005
2006        Ok(AwsResponse::xml(
2007            StatusCode::OK,
2008            xml_wrap(
2009                "CreateDBParameterGroup",
2010                &format!(
2011                    "<DBParameterGroup>{}</DBParameterGroup>",
2012                    db_parameter_group_xml(&parameter_group)
2013                ),
2014                &request.request_id,
2015            ),
2016        ))
2017    }
2018
2019    fn describe_db_parameter_groups(
2020        &self,
2021        request: &AwsRequest,
2022    ) -> Result<AwsResponse, AwsServiceError> {
2023        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2024        let marker = optional_param(request, "Marker");
2025        let max_records = optional_param(request, "MaxRecords");
2026
2027        let accounts = self.state.read();
2028        let empty = RdsState::new(&request.account_id, &request.region);
2029        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2030
2031        // If specific parameter group requested, return just that one (no pagination)
2032        if let Some(name) = db_parameter_group_name {
2033            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2034                AwsServiceError::aws_error(
2035                    StatusCode::NOT_FOUND,
2036                    "DBParameterGroupNotFound",
2037                    format!("DBParameterGroup {} not found.", name),
2038                )
2039            })?;
2040
2041            return Ok(AwsResponse::xml(
2042                StatusCode::OK,
2043                xml_wrap(
2044                    "DescribeDBParameterGroups",
2045                    &format!(
2046                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2047                        db_parameter_group_xml(pg)
2048                    ),
2049                    &request.request_id,
2050                ),
2051            ));
2052        }
2053
2054        // Get all parameter groups sorted by name
2055        let mut parameter_groups: Vec<DbParameterGroup> =
2056            state.parameter_groups.values().cloned().collect();
2057        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2058
2059        // Apply pagination
2060        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2061            &pg.db_parameter_group_name
2062        })?;
2063
2064        let marker_xml = paginated
2065            .next_marker
2066            .as_ref()
2067            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2068            .unwrap_or_default();
2069
2070        let body = paginated
2071            .items
2072            .iter()
2073            .map(|pg| {
2074                format!(
2075                    "<DBParameterGroup>{}</DBParameterGroup>",
2076                    db_parameter_group_xml(pg)
2077                )
2078            })
2079            .collect::<Vec<_>>()
2080            .join("");
2081
2082        Ok(AwsResponse::xml(
2083            StatusCode::OK,
2084            xml_wrap(
2085                "DescribeDBParameterGroups",
2086                &format!(
2087                    "<DBParameterGroups>{}</DBParameterGroups>{}",
2088                    body, marker_xml
2089                ),
2090                &request.request_id,
2091            ),
2092        ))
2093    }
2094
2095    fn delete_db_parameter_group(
2096        &self,
2097        request: &AwsRequest,
2098    ) -> Result<AwsResponse, AwsServiceError> {
2099        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2100
2101        let mut accounts = self.state.write();
2102        let state = accounts.get_or_create(&request.account_id);
2103
2104        if db_parameter_group_name.starts_with("default.") {
2105            return Err(AwsServiceError::aws_error(
2106                StatusCode::BAD_REQUEST,
2107                "InvalidParameterValue",
2108                "Cannot delete default parameter groups.",
2109            ));
2110        }
2111
2112        if state
2113            .parameter_groups
2114            .remove(&db_parameter_group_name)
2115            .is_none()
2116        {
2117            return Err(AwsServiceError::aws_error(
2118                StatusCode::NOT_FOUND,
2119                "DBParameterGroupNotFound",
2120                format!("DBParameterGroup {db_parameter_group_name} not found."),
2121            ));
2122        }
2123
2124        Ok(AwsResponse::xml(
2125            StatusCode::OK,
2126            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
2127        ))
2128    }
2129
2130    fn modify_db_parameter_group(
2131        &self,
2132        request: &AwsRequest,
2133    ) -> Result<AwsResponse, AwsServiceError> {
2134        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2135
2136        let mut accounts = self.state.write();
2137        let state = accounts.get_or_create(&request.account_id);
2138
2139        let parameter_group = state
2140            .parameter_groups
2141            .get_mut(&db_parameter_group_name)
2142            .ok_or_else(|| {
2143                AwsServiceError::aws_error(
2144                    StatusCode::NOT_FOUND,
2145                    "DBParameterGroupNotFound",
2146                    format!("DBParameterGroup {db_parameter_group_name} not found."),
2147                )
2148            })?;
2149
2150        if let Some(new_description) = optional_param(request, "Description") {
2151            parameter_group.description = new_description;
2152        }
2153
2154        let parameter_group_clone = parameter_group.clone();
2155
2156        Ok(AwsResponse::xml(
2157            StatusCode::OK,
2158            xml_wrap(
2159                "ModifyDBParameterGroup",
2160                &format!(
2161                    "<DBParameterGroupName>{}</DBParameterGroupName>",
2162                    xml_escape(&parameter_group_clone.db_parameter_group_name)
2163                ),
2164                &request.request_id,
2165            ),
2166        ))
2167    }
2168}
2169
2170fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2171    fakecloud_core::query::optional_query_param(req, name)
2172}
2173
2174fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2175    fakecloud_core::query::required_query_param(req, name)
2176}
2177
2178fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2179    let value = required_param(req, name)?;
2180    value.parse::<i32>().map_err(|_| {
2181        AwsServiceError::aws_error(
2182            StatusCode::BAD_REQUEST,
2183            "InvalidParameterValue",
2184            format!("Parameter {name} must be a valid integer."),
2185        )
2186    })
2187}
2188
2189fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2190    optional_param(req, name)
2191        .map(|value| {
2192            value.parse::<i32>().map_err(|_| {
2193                AwsServiceError::aws_error(
2194                    StatusCode::BAD_REQUEST,
2195                    "InvalidParameterValue",
2196                    format!("Parameter {name} must be a valid integer."),
2197                )
2198            })
2199        })
2200        .transpose()
2201}
2202
2203fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2204    let mut tags = Vec::new();
2205    for index in 1.. {
2206        let key_name = format!("Tags.Tag.{index}.Key");
2207        let value_name = format!("Tags.Tag.{index}.Value");
2208        let key = optional_param(req, &key_name);
2209        let value = optional_param(req, &value_name);
2210
2211        match (key, value) {
2212            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2213            (None, None) => break,
2214            _ => {
2215                return Err(AwsServiceError::aws_error(
2216                    StatusCode::BAD_REQUEST,
2217                    "InvalidParameterValue",
2218                    "Each tag must include both Key and Value.",
2219                ));
2220            }
2221        }
2222    }
2223
2224    Ok(tags)
2225}
2226
2227fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2228    let mut keys = Vec::new();
2229    for index in 1.. {
2230        let key_name = format!("TagKeys.member.{index}");
2231        match optional_param(req, &key_name) {
2232            Some(key) => keys.push(key),
2233            None => break,
2234        }
2235    }
2236
2237    Ok(keys)
2238}
2239
2240fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2241    let mut subnet_ids = Vec::new();
2242    for index in 1.. {
2243        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2244        match optional_param(req, &subnet_id_name) {
2245            Some(subnet_id) => subnet_ids.push(subnet_id),
2246            None => break,
2247        }
2248    }
2249
2250    Ok(subnet_ids)
2251}
2252
2253fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2254    let mut security_group_ids = Vec::new();
2255    for index in 1.. {
2256        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2257        match optional_param(req, &sg_id_name) {
2258            Some(sg_id) => security_group_ids.push(sg_id),
2259            None => break,
2260        }
2261    }
2262
2263    // If no security groups provided, return a default one
2264    if security_group_ids.is_empty() {
2265        security_group_ids.push("sg-default".to_string());
2266    }
2267
2268    security_group_ids
2269}
2270
2271fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2272    req.query_params.keys().any(|key| key.starts_with(prefix))
2273}
2274
2275fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2276    value
2277        .map(|raw| match raw {
2278            "true" | "True" | "TRUE" => Ok(true),
2279            "false" | "False" | "FALSE" => Ok(false),
2280            _ => Err(AwsServiceError::aws_error(
2281                StatusCode::BAD_REQUEST,
2282                "InvalidParameterValue",
2283                format!("Boolean parameter value '{raw}' is invalid."),
2284            )),
2285        })
2286        .transpose()
2287}
2288
2289struct PaginationResult<T> {
2290    items: Vec<T>,
2291    next_marker: Option<String>,
2292}
2293
2294fn paginate<T, F>(
2295    mut items: Vec<T>,
2296    marker: Option<String>,
2297    max_records: Option<String>,
2298    get_id: F,
2299) -> Result<PaginationResult<T>, AwsServiceError>
2300where
2301    F: Fn(&T) -> &str,
2302{
2303    // Parse max_records with default 100, max 100
2304    let max = if let Some(max_str) = max_records {
2305        let parsed = max_str.parse::<i32>().map_err(|_| {
2306            AwsServiceError::aws_error(
2307                StatusCode::BAD_REQUEST,
2308                "InvalidParameterValue",
2309                "MaxRecords must be a valid integer.",
2310            )
2311        })?;
2312        if !(1..=100).contains(&parsed) {
2313            return Err(AwsServiceError::aws_error(
2314                StatusCode::BAD_REQUEST,
2315                "InvalidParameterValue",
2316                "MaxRecords must be between 1 and 100.",
2317            ));
2318        }
2319        parsed as usize
2320    } else {
2321        100
2322    };
2323
2324    // Decode marker to get starting identifier
2325    let start_id = if let Some(encoded_marker) = marker {
2326        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2327            AwsServiceError::aws_error(
2328                StatusCode::BAD_REQUEST,
2329                "InvalidParameterValue",
2330                "Marker is invalid.",
2331            )
2332        })?;
2333        let id = String::from_utf8(decoded).map_err(|_| {
2334            AwsServiceError::aws_error(
2335                StatusCode::BAD_REQUEST,
2336                "InvalidParameterValue",
2337                "Marker is invalid.",
2338            )
2339        })?;
2340        Some(id)
2341    } else {
2342        None
2343    };
2344
2345    // Find starting position
2346    let start_index = if let Some(ref start_id) = start_id {
2347        items
2348            .iter()
2349            .position(|item| get_id(item) == start_id)
2350            .map(|pos| pos + 1) // Start after the marker
2351            .unwrap_or(items.len()) // If not found, return empty result
2352    } else {
2353        0
2354    };
2355
2356    // Take items from start_index
2357    let total_items = items.len();
2358    let end_index = std::cmp::min(start_index + max, total_items);
2359    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2360
2361    // Create next marker if there are more items
2362    let next_marker = if end_index < total_items {
2363        paginated_items
2364            .last()
2365            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2366    } else {
2367        None
2368    };
2369
2370    Ok(PaginationResult {
2371        items: paginated_items,
2372        next_marker,
2373    })
2374}
2375
2376fn validate_create_request(
2377    db_instance_identifier: &str,
2378    allocated_storage: i32,
2379    db_instance_class: &str,
2380    engine: &str,
2381    engine_version: &str,
2382    port: i32,
2383) -> Result<(), AwsServiceError> {
2384    if allocated_storage <= 0 {
2385        return Err(AwsServiceError::aws_error(
2386            StatusCode::BAD_REQUEST,
2387            "InvalidParameterValue",
2388            "AllocatedStorage must be greater than zero.",
2389        ));
2390    }
2391    if port <= 0 {
2392        return Err(AwsServiceError::aws_error(
2393            StatusCode::BAD_REQUEST,
2394            "InvalidParameterValue",
2395            "Port must be greater than zero.",
2396        ));
2397    }
2398    if !db_instance_identifier
2399        .chars()
2400        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2401    {
2402        return Err(AwsServiceError::aws_error(
2403            StatusCode::BAD_REQUEST,
2404            "InvalidParameterValue",
2405            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2406        ));
2407    }
2408    // Validate engine
2409    let supported_engines = ["postgres", "mysql", "mariadb"];
2410    if !supported_engines.contains(&engine) {
2411        return Err(AwsServiceError::aws_error(
2412            StatusCode::BAD_REQUEST,
2413            "InvalidParameterValue",
2414            format!("Engine '{}' is not supported.", engine),
2415        ));
2416    }
2417
2418    // Validate engine version
2419    let supported_versions = match engine {
2420        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2421        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2422        "mariadb" => vec!["10.11.6", "10.6.16"],
2423        _ => vec![],
2424    };
2425
2426    if !supported_versions.contains(&engine_version) {
2427        return Err(AwsServiceError::aws_error(
2428            StatusCode::BAD_REQUEST,
2429            "InvalidParameterValue",
2430            format!("EngineVersion '{engine_version}' is not supported yet."),
2431        ));
2432    }
2433    validate_db_instance_class(db_instance_class)?;
2434    Ok(())
2435}
2436
2437fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2438    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2439        return Err(AwsServiceError::aws_error(
2440            StatusCode::BAD_REQUEST,
2441            "InvalidParameterValue",
2442            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2443        ));
2444    }
2445    Ok(())
2446}
2447
2448fn filter_engine_versions(
2449    versions: &[EngineVersionInfo],
2450    engine: &Option<String>,
2451    engine_version: &Option<String>,
2452    family: &Option<String>,
2453) -> Vec<EngineVersionInfo> {
2454    versions
2455        .iter()
2456        .filter(|candidate| {
2457            engine
2458                .as_ref()
2459                .is_none_or(|expected| candidate.engine == *expected)
2460        })
2461        .filter(|candidate| {
2462            engine_version
2463                .as_ref()
2464                .is_none_or(|expected| candidate.engine_version == *expected)
2465        })
2466        .filter(|candidate| {
2467            family
2468                .as_ref()
2469                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2470        })
2471        .cloned()
2472        .collect()
2473}
2474
2475fn filter_orderable_options(
2476    options: &[OrderableDbInstanceOption],
2477    engine: &Option<String>,
2478    engine_version: &Option<String>,
2479    db_instance_class: &Option<String>,
2480    license_model: &Option<String>,
2481    vpc: Option<bool>,
2482) -> Vec<OrderableDbInstanceOption> {
2483    options
2484        .iter()
2485        .filter(|candidate| {
2486            engine
2487                .as_ref()
2488                .is_none_or(|expected| candidate.engine == *expected)
2489        })
2490        .filter(|candidate| {
2491            engine_version
2492                .as_ref()
2493                .is_none_or(|expected| candidate.engine_version == *expected)
2494        })
2495        .filter(|candidate| {
2496            db_instance_class
2497                .as_ref()
2498                .is_none_or(|expected| candidate.db_instance_class == *expected)
2499        })
2500        .filter(|candidate| {
2501            license_model
2502                .as_ref()
2503                .is_none_or(|expected| candidate.license_model == *expected)
2504        })
2505        .filter(|_| vpc.unwrap_or(true))
2506        .cloned()
2507        .collect()
2508}
2509
2510/// Build a `DbInstance` for a newly-created read replica, copying the
2511/// source instance's physical attributes and binding the replica's
2512/// identifier, ARN, resource id, container id and host port.
2513#[allow(clippy::too_many_arguments)]
2514/// Build a `DbInstance` from a restored snapshot. Copies the physical
2515/// attributes off the snapshot and binds the new instance's identifier,
2516/// ARN, resource id, container id and host port.
2517fn build_restored_instance(
2518    db_instance_identifier: &str,
2519    db_instance_arn: String,
2520    dbi_resource_id: String,
2521    created_at: chrono::DateTime<Utc>,
2522    vpc_security_group_ids: Vec<String>,
2523    snapshot: &DbSnapshot,
2524    running: &crate::runtime::RunningDbContainer,
2525) -> DbInstance {
2526    DbInstance {
2527        db_instance_identifier: db_instance_identifier.to_string(),
2528        db_instance_arn,
2529        db_instance_class: "db.t3.micro".to_string(),
2530        engine: snapshot.engine.clone(),
2531        engine_version: snapshot.engine_version.clone(),
2532        db_instance_status: "available".to_string(),
2533        master_username: snapshot.master_username.clone(),
2534        db_name: snapshot.db_name.clone(),
2535        endpoint_address: "127.0.0.1".to_string(),
2536        port: i32::from(running.host_port),
2537        allocated_storage: snapshot.allocated_storage,
2538        publicly_accessible: true,
2539        deletion_protection: false,
2540        created_at,
2541        dbi_resource_id,
2542        master_user_password: snapshot.master_user_password.clone(),
2543        container_id: running.container_id.clone(),
2544        host_port: running.host_port,
2545        tags: Vec::new(),
2546        read_replica_source_db_instance_identifier: None,
2547        read_replica_db_instance_identifiers: Vec::new(),
2548        vpc_security_group_ids,
2549        db_parameter_group_name: None,
2550        backup_retention_period: 1,
2551        preferred_backup_window: "03:00-04:00".to_string(),
2552        latest_restorable_time: Some(created_at),
2553        option_group_name: None,
2554        multi_az: false,
2555        pending_modified_values: None,
2556    }
2557}
2558
2559fn build_read_replica_instance(
2560    db_instance_identifier: &str,
2561    db_instance_arn: String,
2562    dbi_resource_id: String,
2563    created_at: chrono::DateTime<Utc>,
2564    source_db_instance_identifier: &str,
2565    source: &DbInstance,
2566    running: &crate::runtime::RunningDbContainer,
2567) -> DbInstance {
2568    DbInstance {
2569        db_instance_identifier: db_instance_identifier.to_string(),
2570        db_instance_arn,
2571        db_instance_class: source.db_instance_class.clone(),
2572        engine: source.engine.clone(),
2573        engine_version: source.engine_version.clone(),
2574        db_instance_status: "available".to_string(),
2575        master_username: source.master_username.clone(),
2576        db_name: source.db_name.clone(),
2577        endpoint_address: "127.0.0.1".to_string(),
2578        port: i32::from(running.host_port),
2579        allocated_storage: source.allocated_storage,
2580        publicly_accessible: source.publicly_accessible,
2581        deletion_protection: false,
2582        created_at,
2583        dbi_resource_id,
2584        master_user_password: source.master_user_password.clone(),
2585        container_id: running.container_id.clone(),
2586        host_port: running.host_port,
2587        tags: Vec::new(),
2588        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2589        read_replica_db_instance_identifiers: Vec::new(),
2590        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2591        db_parameter_group_name: source.db_parameter_group_name.clone(),
2592        backup_retention_period: source.backup_retention_period,
2593        preferred_backup_window: source.preferred_backup_window.clone(),
2594        latest_restorable_time: if source.backup_retention_period > 0 {
2595            Some(created_at)
2596        } else {
2597            None
2598        },
2599        option_group_name: source.option_group_name.clone(),
2600        multi_az: source.multi_az,
2601        pending_modified_values: None,
2602    }
2603}
2604
2605fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2606    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2607}
2608
2609fn engine_version_xml(version: &EngineVersionInfo) -> String {
2610    format!(
2611        "<DBEngineVersion>\
2612         <Engine>{}</Engine>\
2613         <EngineVersion>{}</EngineVersion>\
2614         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2615         <DBEngineDescription>{}</DBEngineDescription>\
2616         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2617         <Status>{}</Status>\
2618         </DBEngineVersion>",
2619        xml_escape(&version.engine),
2620        xml_escape(&version.engine_version),
2621        xml_escape(&version.db_parameter_group_family),
2622        xml_escape(&version.db_engine_description),
2623        xml_escape(&version.db_engine_version_description),
2624        xml_escape(&version.status),
2625    )
2626}
2627
2628fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2629    format!(
2630        "<OrderableDBInstanceOption>\
2631         <Engine>{}</Engine>\
2632         <EngineVersion>{}</EngineVersion>\
2633         <DBInstanceClass>{}</DBInstanceClass>\
2634         <LicenseModel>{}</LicenseModel>\
2635         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2636         <MultiAZCapable>true</MultiAZCapable>\
2637         <ReadReplicaCapable>true</ReadReplicaCapable>\
2638         <Vpc>true</Vpc>\
2639         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2640         <StorageType>{}</StorageType>\
2641         <SupportsIops>false</SupportsIops>\
2642         <MinStorageSize>{}</MinStorageSize>\
2643         <MaxStorageSize>{}</MaxStorageSize>\
2644         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2645         </OrderableDBInstanceOption>",
2646        xml_escape(&option.engine),
2647        xml_escape(&option.engine_version),
2648        xml_escape(&option.db_instance_class),
2649        xml_escape(&option.license_model),
2650        xml_escape(&option.storage_type),
2651        option.min_storage_size,
2652        option.max_storage_size,
2653    )
2654}
2655
2656fn tag_xml(tag: &RdsTag) -> String {
2657    format!(
2658        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2659        xml_escape(&tag.key),
2660        xml_escape(&tag.value),
2661    )
2662}
2663
2664fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2665    let status = status_override.unwrap_or(&instance.db_instance_status);
2666    let db_name_xml = instance
2667        .db_name
2668        .as_ref()
2669        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2670        .unwrap_or_default();
2671
2672    let read_replica_source_xml = instance
2673        .read_replica_source_db_instance_identifier
2674        .as_ref()
2675        .map(|source| {
2676            format!(
2677                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2678                xml_escape(source)
2679            )
2680        })
2681        .unwrap_or_default();
2682
2683    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2684        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2685    } else {
2686        format!(
2687            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2688            instance
2689                .read_replica_db_instance_identifiers
2690                .iter()
2691                .map(|id| format!(
2692                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2693                    xml_escape(id)
2694                ))
2695                .collect::<String>()
2696        )
2697    };
2698
2699    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2700        "<VpcSecurityGroups/>".to_string()
2701    } else {
2702        format!(
2703            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2704            instance
2705                .vpc_security_group_ids
2706                .iter()
2707                .map(|sg_id| format!(
2708                    "<VpcSecurityGroupMembership>\
2709                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2710                     <Status>active</Status>\
2711                     </VpcSecurityGroupMembership>",
2712                    xml_escape(sg_id)
2713                ))
2714                .collect::<String>()
2715        )
2716    };
2717
2718    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2719        Some(pg_name) => format!(
2720            "<DBParameterGroups>\
2721             <DBParameterGroup>\
2722             <DBParameterGroupName>{}</DBParameterGroupName>\
2723             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2724             </DBParameterGroup>\
2725             </DBParameterGroups>",
2726            xml_escape(pg_name)
2727        ),
2728        None => "<DBParameterGroups/>".to_string(),
2729    };
2730
2731    let option_group_memberships_xml = match &instance.option_group_name {
2732        Some(og_name) => format!(
2733            "<OptionGroupMemberships>\
2734             <OptionGroupMembership>\
2735             <OptionGroupName>{}</OptionGroupName>\
2736             <Status>in-sync</Status>\
2737             </OptionGroupMembership>\
2738             </OptionGroupMemberships>",
2739            xml_escape(og_name)
2740        ),
2741        None => "<OptionGroupMemberships/>".to_string(),
2742    };
2743
2744    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2745        let mut fields = Vec::new();
2746        if let Some(ref class) = pending.db_instance_class {
2747            fields.push(format!(
2748                "<DBInstanceClass>{}</DBInstanceClass>",
2749                xml_escape(class)
2750            ));
2751        }
2752        if let Some(allocated_storage) = pending.allocated_storage {
2753            fields.push(format!(
2754                "<AllocatedStorage>{}</AllocatedStorage>",
2755                allocated_storage
2756            ));
2757        }
2758        if let Some(backup_retention_period) = pending.backup_retention_period {
2759            fields.push(format!(
2760                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2761                backup_retention_period
2762            ));
2763        }
2764        if let Some(multi_az) = pending.multi_az {
2765            fields.push(format!(
2766                "<MultiAZ>{}</MultiAZ>",
2767                if multi_az { "true" } else { "false" }
2768            ));
2769        }
2770        if let Some(ref engine_version) = pending.engine_version {
2771            fields.push(format!(
2772                "<EngineVersion>{}</EngineVersion>",
2773                xml_escape(engine_version)
2774            ));
2775        }
2776        if pending.master_user_password.is_some() {
2777            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2778        }
2779        if !fields.is_empty() {
2780            format!(
2781                "<PendingModifiedValues>{}</PendingModifiedValues>",
2782                fields.join("")
2783            )
2784        } else {
2785            String::new()
2786        }
2787    } else {
2788        String::new()
2789    };
2790
2791    let latest_restorable_time_xml = instance
2792        .latest_restorable_time
2793        .map(|t| {
2794            format!(
2795                "<LatestRestorableTime>{}</LatestRestorableTime>",
2796                t.to_rfc3339()
2797            )
2798        })
2799        .unwrap_or_default();
2800
2801    format!(
2802        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2803         <DBInstanceClass>{class}</DBInstanceClass>\
2804         <Engine>{engine}</Engine>\
2805         <DBInstanceStatus>{status}</DBInstanceStatus>\
2806         <MasterUsername>{master_username}</MasterUsername>\
2807         {db_name_xml}\
2808         <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2809         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2810         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2811         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2812         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2813         <DBSecurityGroups/>\
2814         {vpc_security_groups_xml}\
2815         {db_parameter_groups_xml}\
2816         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2817         {latest_restorable_time_xml}\
2818         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2819         <MultiAZ>{multi_az}</MultiAZ>\
2820         <EngineVersion>{engine_version}</EngineVersion>\
2821         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2822         {read_replica_identifiers_xml}\
2823         {read_replica_source_xml}\
2824         <LicenseModel>{license_model}</LicenseModel>\
2825         {option_group_memberships_xml}\
2826         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2827         <StorageType>gp2</StorageType>\
2828         <DbInstancePort>{port}</DbInstancePort>\
2829         <StorageEncrypted>false</StorageEncrypted>\
2830         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2831         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2832         {pending_modified_values_xml}\
2833         <DBInstanceArn>{arn}</DBInstanceArn>",
2834        identifier = xml_escape(&instance.db_instance_identifier),
2835        class = xml_escape(&instance.db_instance_class),
2836        engine = xml_escape(&instance.engine),
2837        status = xml_escape(status),
2838        master_username = xml_escape(&instance.master_username),
2839        endpoint_address = xml_escape(&instance.endpoint_address),
2840        port = instance.port,
2841        allocated_storage = instance.allocated_storage,
2842        create_time = instance.created_at.to_rfc3339(),
2843        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2844        backup_retention_period = instance.backup_retention_period,
2845        multi_az = if instance.multi_az { "true" } else { "false" },
2846        engine_version = xml_escape(&instance.engine_version),
2847        license_model = license_model_for_engine(&instance.engine),
2848        publicly_accessible = if instance.publicly_accessible {
2849            "true"
2850        } else {
2851            "false"
2852        },
2853        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2854        deletion_protection = if instance.deletion_protection {
2855            "true"
2856        } else {
2857            "false"
2858        },
2859        arn = xml_escape(&instance.db_instance_arn),
2860    )
2861}
2862
2863fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2864    format!(
2865        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2866         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2867         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2868         <Engine>{}</Engine>\
2869         <EngineVersion>{}</EngineVersion>\
2870         <AllocatedStorage>{}</AllocatedStorage>\
2871         <Status>{}</Status>\
2872         <Port>{}</Port>\
2873         <MasterUsername>{}</MasterUsername>\
2874         {}\
2875         <DbiResourceId>{}</DbiResourceId>\
2876         <SnapshotType>{}</SnapshotType>\
2877         <DBSnapshotArn>{}</DBSnapshotArn>",
2878        xml_escape(&snapshot.db_snapshot_identifier),
2879        xml_escape(&snapshot.db_instance_identifier),
2880        snapshot.snapshot_create_time.to_rfc3339(),
2881        xml_escape(&snapshot.engine),
2882        xml_escape(&snapshot.engine_version),
2883        snapshot.allocated_storage,
2884        xml_escape(&snapshot.status),
2885        snapshot.port,
2886        xml_escape(&snapshot.master_username),
2887        snapshot
2888            .db_name
2889            .as_ref()
2890            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2891            .unwrap_or_default(),
2892        xml_escape(&snapshot.dbi_resource_id),
2893        xml_escape(&snapshot.snapshot_type),
2894        xml_escape(&snapshot.db_snapshot_arn),
2895    )
2896}
2897
2898fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2899    let subnets_xml = subnet_group
2900        .subnet_ids
2901        .iter()
2902        .zip(&subnet_group.subnet_availability_zones)
2903        .map(|(subnet_id, az)| {
2904            format!(
2905                "<Subnet>\
2906                 <SubnetIdentifier>{}</SubnetIdentifier>\
2907                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2908                 <SubnetStatus>Active</SubnetStatus>\
2909                 </Subnet>",
2910                xml_escape(subnet_id),
2911                xml_escape(az)
2912            )
2913        })
2914        .collect::<String>();
2915
2916    format!(
2917        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2918         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2919         <VpcId>{}</VpcId>\
2920         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2921         <Subnets>{}</Subnets>\
2922         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2923        xml_escape(&subnet_group.db_subnet_group_name),
2924        xml_escape(&subnet_group.db_subnet_group_description),
2925        xml_escape(&subnet_group.vpc_id),
2926        subnets_xml,
2927        xml_escape(&subnet_group.db_subnet_group_arn),
2928    )
2929}
2930
2931fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2932    format!(
2933        "<DBParameterGroupName>{}</DBParameterGroupName>\
2934         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2935         <Description>{}</Description>\
2936         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2937        xml_escape(&parameter_group.db_parameter_group_name),
2938        xml_escape(&parameter_group.db_parameter_group_family),
2939        xml_escape(&parameter_group.description),
2940        xml_escape(&parameter_group.db_parameter_group_arn),
2941    )
2942}
2943
2944fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2945    AwsServiceError::aws_error(
2946        StatusCode::NOT_FOUND,
2947        "DBInstanceNotFound",
2948        format!("DBInstance {} not found.", identifier),
2949    )
2950}
2951
2952fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2953    AwsServiceError::aws_error(
2954        StatusCode::NOT_FOUND,
2955        "DBSnapshotNotFound",
2956        format!("DBSnapshot {} not found.", identifier),
2957    )
2958}
2959
2960fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2961    AwsServiceError::aws_error(
2962        StatusCode::NOT_FOUND,
2963        "DBInstanceNotFound",
2964        format!("DBInstance {resource_name} not found."),
2965    )
2966}
2967
2968fn find_instance_by_arn<'a>(
2969    state: &'a crate::state::RdsState,
2970    resource_name: &str,
2971) -> Result<&'a DbInstance, AwsServiceError> {
2972    state
2973        .instances
2974        .values()
2975        .find(|instance| instance.db_instance_arn == resource_name)
2976        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2977}
2978
2979fn find_instance_by_arn_mut<'a>(
2980    state: &'a mut crate::state::RdsState,
2981    resource_name: &str,
2982) -> Result<&'a mut DbInstance, AwsServiceError> {
2983    state
2984        .instances
2985        .values_mut()
2986        .find(|instance| instance.db_instance_arn == resource_name)
2987        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2988}
2989
2990fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2991    for tag in incoming {
2992        if let Some(existing_tag) = existing
2993            .iter_mut()
2994            .find(|candidate| candidate.key == tag.key)
2995        {
2996            existing_tag.value = tag.value.clone();
2997        } else {
2998            existing.push(tag.clone());
2999        }
3000    }
3001}
3002
3003fn license_model_for_engine(engine: &str) -> &'static str {
3004    match engine {
3005        "mysql" | "mariadb" => "general-public-license",
3006        _ => "postgresql-license",
3007    }
3008}
3009
3010fn default_db_name(engine: &str) -> &'static str {
3011    match engine {
3012        "mysql" | "mariadb" => "mysql",
3013        _ => "postgres",
3014    }
3015}
3016
3017/// Pick the port AWS defaults to for a freshly-created instance of
3018/// `engine`. PostgreSQL lives on 5432; MySQL and MariaDB share 3306.
3019fn default_port_for_engine(engine: &str) -> i32 {
3020    match engine {
3021        "postgres" => 5432,
3022        "mysql" | "mariadb" => 3306,
3023        _ => 5432,
3024    }
3025}
3026
3027/// Pick the built-in parameter group name AWS assigns to a new
3028/// instance when the caller doesn't override it. The name encodes the
3029/// engine family plus its major version (e.g. `default.postgres16`,
3030/// `default.mysql8.0`).
3031fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3032    match engine {
3033        "postgres" => {
3034            let major = engine_version.split('.').next().unwrap_or("16");
3035            format!("default.postgres{}", major)
3036        }
3037        "mysql" => {
3038            let major = if engine_version.starts_with("5.7") {
3039                "5.7"
3040            } else {
3041                "8.0"
3042            };
3043            format!("default.mysql{}", major)
3044        }
3045        "mariadb" => {
3046            let major = if engine_version.starts_with("10.11") {
3047                "10.11"
3048            } else {
3049                "10.6"
3050            };
3051            format!("default.mariadb{}", major)
3052        }
3053        _ => "default.postgres16".to_string(),
3054    }
3055}
3056
3057fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3058    match error {
3059        RuntimeError::Unavailable => AwsServiceError::aws_error(
3060            StatusCode::SERVICE_UNAVAILABLE,
3061            "InvalidParameterValue",
3062            "Docker/Podman is required for RDS DB instances but is not available",
3063        ),
3064        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3065            StatusCode::INTERNAL_SERVER_ERROR,
3066            "InternalFailure",
3067            message,
3068        ),
3069    }
3070}
3071
3072#[cfg(test)]
3073mod tests {
3074    use std::collections::HashMap;
3075    use std::sync::Arc;
3076
3077    use bytes::Bytes;
3078    use chrono::Utc;
3079    use http::{HeaderMap, Method};
3080    use parking_lot::RwLock;
3081    use uuid::Uuid;
3082
3083    use super::{
3084        db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
3085        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3086        RdsSourceType,
3087    };
3088    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3089    use fakecloud_core::delivery::DeliveryBus;
3090    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3091
3092    #[test]
3093    fn filter_engine_versions_matches_requested_engine() {
3094        let versions = default_engine_versions();
3095
3096        let filtered =
3097            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3098
3099        assert_eq!(filtered.len(), 4); // All postgres versions
3100        assert!(filtered.iter().all(|v| v.engine == "postgres"));
3101    }
3102
3103    #[test]
3104    fn filter_orderable_options_respects_instance_class() {
3105        let options = default_orderable_options();
3106
3107        let filtered = filter_orderable_options(
3108            &options,
3109            &Some("postgres".to_string()),
3110            &Some("16.3".to_string()),
3111            &Some("db.t3.micro".to_string()),
3112            &None,
3113            Some(true),
3114        );
3115
3116        assert_eq!(filtered.len(), 1);
3117        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3118    }
3119
3120    #[test]
3121    fn validate_create_request_rejects_unsupported_engine() {
3122        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3123            .expect_err("unsupported engine");
3124
3125        assert_eq!(error.code(), "InvalidParameterValue");
3126    }
3127
3128    #[test]
3129    fn optional_i32_param_rejects_invalid_integer() {
3130        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3131
3132        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3133
3134        assert_eq!(error.code(), "InvalidParameterValue");
3135    }
3136
3137    #[test]
3138    fn db_instance_xml_renders_endpoint_and_status() {
3139        let created_at = Utc::now();
3140        let instance = DbInstance {
3141            db_instance_identifier: "test-db".to_string(),
3142            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3143            db_instance_class: "db.t3.micro".to_string(),
3144            engine: "postgres".to_string(),
3145            engine_version: "16.3".to_string(),
3146            db_instance_status: "available".to_string(),
3147            master_username: "admin".to_string(),
3148            db_name: Some("appdb".to_string()),
3149            endpoint_address: "127.0.0.1".to_string(),
3150            port: 15432,
3151            allocated_storage: 20,
3152            publicly_accessible: true,
3153            deletion_protection: false,
3154            created_at,
3155            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3156            master_user_password: "secret123".to_string(),
3157            container_id: "container".to_string(),
3158            host_port: 15432,
3159            tags: Vec::new(),
3160            read_replica_source_db_instance_identifier: None,
3161            read_replica_db_instance_identifiers: Vec::new(),
3162            vpc_security_group_ids: vec!["sg-12345678".to_string()],
3163            db_parameter_group_name: Some("default.postgres16".to_string()),
3164            backup_retention_period: 1,
3165            preferred_backup_window: "03:00-04:00".to_string(),
3166            latest_restorable_time: Some(created_at),
3167            option_group_name: None,
3168            multi_az: false,
3169            pending_modified_values: None,
3170        };
3171
3172        let xml = db_instance_xml(&instance, Some("creating"));
3173
3174        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3175        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3176        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3177    }
3178
3179    #[test]
3180    fn parse_tags_reads_rds_query_shape() {
3181        let request = request(
3182            "AddTagsToResource",
3183            &[
3184                ("Tags.Tag.1.Key", "env"),
3185                ("Tags.Tag.1.Value", "dev"),
3186                ("Tags.Tag.2.Key", "team"),
3187                ("Tags.Tag.2.Value", "core"),
3188            ],
3189        );
3190
3191        let tags = parse_tags(&request).expect("tags");
3192
3193        assert_eq!(
3194            tags,
3195            vec![
3196                RdsTag {
3197                    key: "env".to_string(),
3198                    value: "dev".to_string(),
3199                },
3200                RdsTag {
3201                    key: "team".to_string(),
3202                    value: "core".to_string(),
3203                }
3204            ]
3205        );
3206    }
3207
3208    #[test]
3209    fn parse_tag_keys_reads_member_shape() {
3210        let request = request(
3211            "RemoveTagsFromResource",
3212            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3213        );
3214
3215        let tag_keys = parse_tag_keys(&request).expect("tag keys");
3216
3217        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3218    }
3219
3220    #[test]
3221    fn merge_tags_updates_existing_values() {
3222        let mut tags = vec![RdsTag {
3223            key: "env".to_string(),
3224            value: "dev".to_string(),
3225        }];
3226
3227        merge_tags(
3228            &mut tags,
3229            &[
3230                RdsTag {
3231                    key: "env".to_string(),
3232                    value: "prod".to_string(),
3233                },
3234                RdsTag {
3235                    key: "team".to_string(),
3236                    value: "core".to_string(),
3237                },
3238            ],
3239        );
3240
3241        assert_eq!(tags.len(), 2);
3242        assert_eq!(tags[0].value, "prod");
3243        assert_eq!(tags[1].key, "team");
3244    }
3245
3246    #[tokio::test]
3247    async fn describe_engine_versions_returns_xml_body() {
3248        let service = RdsService::new(Arc::new(RwLock::new(
3249            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3250        )));
3251        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3252
3253        let response = service.handle(request).await.expect("response");
3254        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3255
3256        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3257        assert!(body.contains("<Engine>postgres</Engine>"));
3258        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3259    }
3260
3261    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3262        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3263        for (key, value) in params {
3264            query_params.insert((*key).to_string(), (*value).to_string());
3265        }
3266
3267        AwsRequest {
3268            service: "rds".to_string(),
3269            action: action.to_string(),
3270            region: "us-east-1".to_string(),
3271            account_id: "123456789012".to_string(),
3272            request_id: "test-request-id".to_string(),
3273            headers: HeaderMap::new(),
3274            query_params,
3275            body: Bytes::new(),
3276            path_segments: vec![],
3277            raw_path: "/".to_string(),
3278            raw_query: String::new(),
3279            method: Method::POST,
3280            is_query_protocol: true,
3281            access_key_id: None,
3282            principal: None,
3283        }
3284    }
3285
3286    // ── Helpers for handler tests ────────────────────────────────────
3287
3288    fn make_service() -> RdsService {
3289        RdsService::new(Arc::new(RwLock::new(
3290            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3291        )))
3292    }
3293
3294    #[derive(Default)]
3295    struct CapturedEvent {
3296        source: String,
3297        detail_type: String,
3298        detail: String,
3299    }
3300
3301    #[derive(Default)]
3302    struct RecordingEb {
3303        events: std::sync::Mutex<Vec<CapturedEvent>>,
3304    }
3305
3306    impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3307        fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3308            self.events.lock().unwrap().push(CapturedEvent {
3309                source: source.to_string(),
3310                detail_type: detail_type.to_string(),
3311                detail: detail.to_string(),
3312            });
3313        }
3314    }
3315
3316    fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3317        let recorder = Arc::new(RecordingEb::default());
3318        let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3319        let svc = RdsService::new(Arc::new(RwLock::new(
3320            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3321        )))
3322        .with_delivery_bus(bus);
3323        (svc, recorder)
3324    }
3325
3326    #[test]
3327    fn emit_event_emits_aws_rds_event_via_bus() {
3328        let (svc, rec) = make_service_with_recorder();
3329        svc.emit_event(
3330            RdsSourceType::DbInstance,
3331            "my-db",
3332            "arn:aws:rds:us-east-1:123456789012:db:my-db",
3333            "RDS-EVENT-0005",
3334            &["creation"],
3335            "DB instance created",
3336        );
3337        let events = rec.events.lock().unwrap();
3338        assert_eq!(events.len(), 1);
3339        let e = &events[0];
3340        assert_eq!(e.source, "aws.rds");
3341        assert_eq!(e.detail_type, "RDS DB Instance Event");
3342        let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3343        assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3344        assert_eq!(detail["SourceType"], "DB_INSTANCE");
3345        assert_eq!(detail["SourceIdentifier"], "my-db");
3346        assert_eq!(detail["Message"], "DB instance created");
3347        assert_eq!(detail["EventCategories"][0], "creation");
3348    }
3349
3350    #[test]
3351    fn emit_event_no_op_without_bus() {
3352        let svc = make_service();
3353        svc.emit_event(
3354            RdsSourceType::DbSnapshot,
3355            "snap",
3356            "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3357            "RDS-EVENT-0042",
3358            &["creation"],
3359            "Manual snapshot created",
3360        );
3361    }
3362
3363    #[test]
3364    fn rds_source_type_detail_type_mapping() {
3365        assert_eq!(
3366            RdsSourceType::DbInstance.detail_type(),
3367            "RDS DB Instance Event"
3368        );
3369        assert_eq!(
3370            RdsSourceType::DbSnapshot.detail_type(),
3371            "RDS DB Snapshot Event"
3372        );
3373        assert_eq!(
3374            RdsSourceType::DbParameterGroup.detail_type(),
3375            "RDS DB Parameter Group Event"
3376        );
3377    }
3378
3379    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3380        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3381    }
3382
3383    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3384        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3385        let mut accounts = svc.state.write();
3386        let state = accounts.default_mut();
3387        state.instances.insert(
3388            identifier.to_string(),
3389            DbInstance {
3390                db_instance_identifier: identifier.to_string(),
3391                db_instance_arn: arn.clone(),
3392                db_instance_class: "db.t3.micro".to_string(),
3393                engine: "postgres".to_string(),
3394                engine_version: "16.3".to_string(),
3395                db_instance_status: "available".to_string(),
3396                master_username: "admin".to_string(),
3397                db_name: Some("appdb".to_string()),
3398                endpoint_address: "127.0.0.1".to_string(),
3399                port: 15432,
3400                allocated_storage: 20,
3401                publicly_accessible: true,
3402                deletion_protection: false,
3403                created_at: Utc::now(),
3404                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3405                master_user_password: "secret".to_string(),
3406                container_id: "container".to_string(),
3407                host_port: 15432,
3408                tags: Vec::new(),
3409                read_replica_source_db_instance_identifier: None,
3410                read_replica_db_instance_identifiers: Vec::new(),
3411                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3412                db_parameter_group_name: Some("default.postgres16".to_string()),
3413                backup_retention_period: 1,
3414                preferred_backup_window: "03:00-04:00".to_string(),
3415                latest_restorable_time: None,
3416                option_group_name: None,
3417                multi_az: false,
3418                pending_modified_values: None,
3419            },
3420        );
3421        arn
3422    }
3423
3424    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3425        match result {
3426            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3427            Err(e) => {
3428                assert_eq!(e.code(), expected_code, "wrong error code");
3429                e
3430            }
3431        }
3432    }
3433
3434    // ── Tag operations ───────────────────────────────────────────────
3435
3436    #[test]
3437    fn add_tags_requires_resource_name() {
3438        let svc = make_service();
3439        let req = request("AddTagsToResource", &[]);
3440        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3441    }
3442
3443    #[test]
3444    fn add_tags_requires_at_least_one_tag() {
3445        let svc = make_service();
3446        let arn = seed_instance(&svc, "db1");
3447        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3448        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3449    }
3450
3451    #[test]
3452    fn add_tags_appends_then_list_tags_returns_them() {
3453        let svc = make_service();
3454        let arn = seed_instance(&svc, "db1");
3455        let add_req = request(
3456            "AddTagsToResource",
3457            &[
3458                ("ResourceName", arn.as_str()),
3459                ("Tags.Tag.1.Key", "env"),
3460                ("Tags.Tag.1.Value", "dev"),
3461            ],
3462        );
3463        svc.add_tags_to_resource(&add_req).unwrap();
3464
3465        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3466        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3467        assert!(body.contains("<Key>env</Key>"));
3468        assert!(body.contains("<Value>dev</Value>"));
3469    }
3470
3471    #[test]
3472    fn list_tags_rejects_filters_param() {
3473        let svc = make_service();
3474        let arn = seed_instance(&svc, "db1");
3475        let req = request(
3476            "ListTagsForResource",
3477            &[
3478                ("ResourceName", arn.as_str()),
3479                ("Filters.Filter.1.Name", "x"),
3480            ],
3481        );
3482        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3483    }
3484
3485    #[test]
3486    fn list_tags_unknown_arn_errors() {
3487        let svc = make_service();
3488        let req = request(
3489            "ListTagsForResource",
3490            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3491        );
3492        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3493    }
3494
3495    #[test]
3496    fn remove_tags_strips_only_listed_keys() {
3497        let svc = make_service();
3498        let arn = seed_instance(&svc, "db1");
3499        {
3500            let mut __a = svc.state.write();
3501            let state = __a.default_mut();
3502            let inst = state.instances.get_mut("db1").unwrap();
3503            inst.tags = vec![
3504                RdsTag {
3505                    key: "env".to_string(),
3506                    value: "dev".to_string(),
3507                },
3508                RdsTag {
3509                    key: "team".to_string(),
3510                    value: "core".to_string(),
3511                },
3512            ];
3513        }
3514        let req = request(
3515            "RemoveTagsFromResource",
3516            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3517        );
3518        svc.remove_tags_from_resource(&req).unwrap();
3519
3520        let __a = svc.state.read();
3521        let state = __a.default_ref();
3522        let tags = &state.instances.get("db1").unwrap().tags;
3523        assert_eq!(tags.len(), 1);
3524        assert_eq!(tags[0].key, "team");
3525    }
3526
3527    #[test]
3528    fn remove_tags_requires_keys() {
3529        let svc = make_service();
3530        let arn = seed_instance(&svc, "db1");
3531        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3532        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3533    }
3534
3535    // ── DB Subnet Groups ─────────────────────────────────────────────
3536
3537    fn create_subnet_group(svc: &RdsService, name: &str) {
3538        let req = request(
3539            "CreateDBSubnetGroup",
3540            &[
3541                ("DBSubnetGroupName", name),
3542                ("DBSubnetGroupDescription", "test"),
3543                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3544                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3545            ],
3546        );
3547        svc.create_db_subnet_group(&req).unwrap();
3548    }
3549
3550    #[test]
3551    fn create_db_subnet_group_requires_two_subnets() {
3552        let svc = make_service();
3553        let req = request(
3554            "CreateDBSubnetGroup",
3555            &[
3556                ("DBSubnetGroupName", "sg1"),
3557                ("DBSubnetGroupDescription", "t"),
3558                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3559            ],
3560        );
3561        assert_code(
3562            svc.create_db_subnet_group(&req),
3563            "DBSubnetGroupDoesNotCoverEnoughAZs",
3564        );
3565    }
3566
3567    #[test]
3568    fn create_db_subnet_group_rejects_empty_subnets() {
3569        let svc = make_service();
3570        let req = request(
3571            "CreateDBSubnetGroup",
3572            &[
3573                ("DBSubnetGroupName", "sg1"),
3574                ("DBSubnetGroupDescription", "t"),
3575            ],
3576        );
3577        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3578    }
3579
3580    #[test]
3581    fn create_db_subnet_group_rejects_duplicates() {
3582        let svc = make_service();
3583        create_subnet_group(&svc, "sg1");
3584        let req = request(
3585            "CreateDBSubnetGroup",
3586            &[
3587                ("DBSubnetGroupName", "sg1"),
3588                ("DBSubnetGroupDescription", "t"),
3589                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3590                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3591            ],
3592        );
3593        assert_code(
3594            svc.create_db_subnet_group(&req),
3595            "DBSubnetGroupAlreadyExists",
3596        );
3597    }
3598
3599    #[test]
3600    fn describe_db_subnet_groups_by_name_or_list() {
3601        let svc = make_service();
3602        create_subnet_group(&svc, "sg-alpha");
3603        create_subnet_group(&svc, "sg-beta");
3604
3605        let by_name = request(
3606            "DescribeDBSubnetGroups",
3607            &[("DBSubnetGroupName", "sg-alpha")],
3608        );
3609        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3610        assert!(body.contains("sg-alpha"));
3611        assert!(!body.contains("sg-beta"));
3612
3613        let list_all = request("DescribeDBSubnetGroups", &[]);
3614        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3615        assert!(body.contains("sg-alpha"));
3616        assert!(body.contains("sg-beta"));
3617    }
3618
3619    #[test]
3620    fn describe_db_subnet_groups_unknown_name_errors() {
3621        let svc = make_service();
3622        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3623        assert_code(
3624            svc.describe_db_subnet_groups(&req),
3625            "DBSubnetGroupNotFoundFault",
3626        );
3627    }
3628
3629    #[test]
3630    fn delete_db_subnet_group_unknown_errors() {
3631        let svc = make_service();
3632        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3633        assert_code(
3634            svc.delete_db_subnet_group(&req),
3635            "DBSubnetGroupNotFoundFault",
3636        );
3637    }
3638
3639    #[test]
3640    fn delete_db_subnet_group_removes_entry() {
3641        let svc = make_service();
3642        create_subnet_group(&svc, "sg1");
3643        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3644        svc.delete_db_subnet_group(&req).unwrap();
3645        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3646    }
3647
3648    #[test]
3649    fn modify_db_subnet_group_updates_subnet_ids() {
3650        let svc = make_service();
3651        create_subnet_group(&svc, "sg1");
3652        let req = request(
3653            "ModifyDBSubnetGroup",
3654            &[
3655                ("DBSubnetGroupName", "sg1"),
3656                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3657                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3658            ],
3659        );
3660        svc.modify_db_subnet_group(&req).unwrap();
3661
3662        let __a = svc.state.read();
3663        let state = __a.default_ref();
3664        let sg = state.subnet_groups.get("sg1").unwrap();
3665        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3666    }
3667
3668    // ── DB Parameter Groups ──────────────────────────────────────────
3669
3670    fn create_param_group(svc: &RdsService, name: &str) {
3671        let req = request(
3672            "CreateDBParameterGroup",
3673            &[
3674                ("DBParameterGroupName", name),
3675                ("DBParameterGroupFamily", "postgres16"),
3676                ("Description", "test"),
3677            ],
3678        );
3679        svc.create_db_parameter_group(&req).unwrap();
3680    }
3681
3682    #[test]
3683    fn create_db_parameter_group_rejects_unknown_family() {
3684        let svc = make_service();
3685        let req = request(
3686            "CreateDBParameterGroup",
3687            &[
3688                ("DBParameterGroupName", "pg1"),
3689                ("DBParameterGroupFamily", "oracle19"),
3690                ("Description", "t"),
3691            ],
3692        );
3693        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3694    }
3695
3696    #[test]
3697    fn create_db_parameter_group_rejects_duplicates() {
3698        let svc = make_service();
3699        create_param_group(&svc, "pg1");
3700        let req = request(
3701            "CreateDBParameterGroup",
3702            &[
3703                ("DBParameterGroupName", "pg1"),
3704                ("DBParameterGroupFamily", "postgres16"),
3705                ("Description", "t"),
3706            ],
3707        );
3708        assert_code(
3709            svc.create_db_parameter_group(&req),
3710            "DBParameterGroupAlreadyExists",
3711        );
3712    }
3713
3714    #[test]
3715    fn describe_db_parameter_groups_by_name_or_list() {
3716        let svc = make_service();
3717        create_param_group(&svc, "pg-alpha");
3718        create_param_group(&svc, "pg-beta");
3719        let by_name = request(
3720            "DescribeDBParameterGroups",
3721            &[("DBParameterGroupName", "pg-alpha")],
3722        );
3723        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3724        assert!(body.contains("pg-alpha"));
3725        assert!(!body.contains("pg-beta"));
3726        let list = request("DescribeDBParameterGroups", &[]);
3727        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3728        assert!(body.contains("pg-alpha"));
3729        assert!(body.contains("pg-beta"));
3730    }
3731
3732    #[test]
3733    fn describe_db_parameter_groups_unknown_name_errors() {
3734        let svc = make_service();
3735        let req = request(
3736            "DescribeDBParameterGroups",
3737            &[("DBParameterGroupName", "ghost")],
3738        );
3739        assert_code(
3740            svc.describe_db_parameter_groups(&req),
3741            "DBParameterGroupNotFound",
3742        );
3743    }
3744
3745    #[test]
3746    fn delete_db_parameter_group_rejects_default_groups() {
3747        let svc = make_service();
3748        let req = request(
3749            "DeleteDBParameterGroup",
3750            &[("DBParameterGroupName", "default.postgres16")],
3751        );
3752        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3753    }
3754
3755    #[test]
3756    fn delete_db_parameter_group_unknown_errors() {
3757        let svc = make_service();
3758        let req = request(
3759            "DeleteDBParameterGroup",
3760            &[("DBParameterGroupName", "ghost")],
3761        );
3762        assert_code(
3763            svc.delete_db_parameter_group(&req),
3764            "DBParameterGroupNotFound",
3765        );
3766    }
3767
3768    #[test]
3769    fn delete_db_parameter_group_removes_entry() {
3770        let svc = make_service();
3771        create_param_group(&svc, "pg1");
3772        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
3773        svc.delete_db_parameter_group(&req).unwrap();
3774        assert!(!svc
3775            .state
3776            .read()
3777            .default_ref()
3778            .parameter_groups
3779            .contains_key("pg1"));
3780    }
3781
3782    #[test]
3783    fn modify_db_parameter_group_updates_description() {
3784        let svc = make_service();
3785        create_param_group(&svc, "pg1");
3786        let req = request(
3787            "ModifyDBParameterGroup",
3788            &[
3789                ("DBParameterGroupName", "pg1"),
3790                ("Description", "shiny new"),
3791            ],
3792        );
3793        svc.modify_db_parameter_group(&req).unwrap();
3794        let __a = svc.state.read();
3795        let state = __a.default_ref();
3796        assert_eq!(
3797            state.parameter_groups.get("pg1").unwrap().description,
3798            "shiny new"
3799        );
3800    }
3801
3802    #[test]
3803    fn modify_db_parameter_group_unknown_errors() {
3804        let svc = make_service();
3805        let req = request(
3806            "ModifyDBParameterGroup",
3807            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
3808        );
3809        assert_code(
3810            svc.modify_db_parameter_group(&req),
3811            "DBParameterGroupNotFound",
3812        );
3813    }
3814
3815    // ── DescribeDBInstances ──────────────────────────────────────────
3816
3817    #[test]
3818    fn describe_db_instances_by_id_returns_only_one() {
3819        let svc = make_service();
3820        seed_instance(&svc, "db1");
3821        seed_instance(&svc, "db2");
3822        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
3823        let body = body_of(svc.describe_db_instances(&req).unwrap());
3824        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
3825        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
3826    }
3827
3828    #[test]
3829    fn describe_db_instances_unknown_id_errors() {
3830        let svc = make_service();
3831        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3832        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3833    }
3834
3835    #[test]
3836    fn describe_db_instances_lists_all_when_unbounded() {
3837        let svc = make_service();
3838        seed_instance(&svc, "db1");
3839        seed_instance(&svc, "db2");
3840        seed_instance(&svc, "db3");
3841        let req = request("DescribeDBInstances", &[]);
3842        let body = body_of(svc.describe_db_instances(&req).unwrap());
3843        for id in ["db1", "db2", "db3"] {
3844            assert!(body.contains(&format!(
3845                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
3846            )));
3847        }
3848    }
3849
3850    // ── ModifyDBInstance ─────────────────────────────────────────────
3851
3852    #[test]
3853    fn modify_db_instance_requires_at_least_one_change() {
3854        let svc = make_service();
3855        seed_instance(&svc, "db1");
3856        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
3857        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
3858    }
3859
3860    #[test]
3861    fn modify_db_instance_unknown_errors() {
3862        let svc = make_service();
3863        let req = request(
3864            "ModifyDBInstance",
3865            &[
3866                ("DBInstanceIdentifier", "ghost"),
3867                ("DBInstanceClass", "db.t3.small"),
3868            ],
3869        );
3870        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
3871    }
3872
3873    #[test]
3874    fn modify_db_instance_apply_immediately_updates_class() {
3875        let svc = make_service();
3876        seed_instance(&svc, "db1");
3877        let req = request(
3878            "ModifyDBInstance",
3879            &[
3880                ("DBInstanceIdentifier", "db1"),
3881                ("DBInstanceClass", "db.t3.small"),
3882                ("ApplyImmediately", "true"),
3883            ],
3884        );
3885        svc.modify_db_instance(&req).unwrap();
3886        let __a = svc.state.read();
3887        let state = __a.default_ref();
3888        assert_eq!(
3889            state.instances.get("db1").unwrap().db_instance_class,
3890            "db.t3.small"
3891        );
3892    }
3893
3894    #[test]
3895    fn modify_db_instance_pending_when_not_apply_immediately() {
3896        let svc = make_service();
3897        seed_instance(&svc, "db1");
3898        let req = request(
3899            "ModifyDBInstance",
3900            &[
3901                ("DBInstanceIdentifier", "db1"),
3902                ("DBInstanceClass", "db.t3.small"),
3903                ("ApplyImmediately", "false"),
3904            ],
3905        );
3906        svc.modify_db_instance(&req).unwrap();
3907        let __a = svc.state.read();
3908        let state = __a.default_ref();
3909        let inst = state.instances.get("db1").unwrap();
3910        assert_eq!(inst.db_instance_class, "db.t3.micro");
3911        assert_eq!(
3912            inst.pending_modified_values
3913                .as_ref()
3914                .unwrap()
3915                .db_instance_class
3916                .as_deref(),
3917            Some("db.t3.small"),
3918        );
3919    }
3920
3921    // ── Snapshots (sync ops only) ────────────────────────────────────
3922
3923    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
3924        let mut __a = svc.state.write();
3925        let state = __a.default_mut();
3926        let arn = state.db_snapshot_arn(snapshot_id);
3927        state.snapshots.insert(
3928            snapshot_id.to_string(),
3929            crate::state::DbSnapshot {
3930                db_snapshot_identifier: snapshot_id.to_string(),
3931                db_snapshot_arn: arn,
3932                db_instance_identifier: instance_id.to_string(),
3933                snapshot_create_time: Utc::now(),
3934                engine: "postgres".to_string(),
3935                engine_version: "16.3".to_string(),
3936                allocated_storage: 20,
3937                status: "available".to_string(),
3938                port: 5432,
3939                master_username: "admin".to_string(),
3940                db_name: Some("appdb".to_string()),
3941                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3942                snapshot_type: "manual".to_string(),
3943                master_user_password: "secret".to_string(),
3944                tags: Vec::new(),
3945                dump_data: Vec::new(),
3946            },
3947        );
3948    }
3949
3950    #[test]
3951    fn delete_db_snapshot_removes_entry() {
3952        let svc = make_service();
3953        seed_snapshot(&svc, "snap1", "db1");
3954        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
3955        svc.delete_db_snapshot(&req).unwrap();
3956        assert!(svc.state.read().default_ref().snapshots.is_empty());
3957    }
3958
3959    #[test]
3960    fn delete_db_snapshot_unknown_errors() {
3961        let svc = make_service();
3962        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
3963        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
3964    }
3965
3966    #[test]
3967    fn describe_db_snapshots_rejects_both_filters() {
3968        let svc = make_service();
3969        let req = request(
3970            "DescribeDBSnapshots",
3971            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
3972        );
3973        assert_code(
3974            svc.describe_db_snapshots(&req),
3975            "InvalidParameterCombination",
3976        );
3977    }
3978
3979    #[test]
3980    fn describe_db_snapshots_by_id_or_instance() {
3981        let svc = make_service();
3982        seed_snapshot(&svc, "snap1", "db1");
3983        seed_snapshot(&svc, "snap2", "db2");
3984
3985        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
3986        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
3987        assert!(body.contains("snap1"));
3988        assert!(!body.contains("snap2"));
3989
3990        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
3991        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
3992        assert!(body.contains("snap2"));
3993        assert!(!body.contains("snap1"));
3994
3995        let list_all = request("DescribeDBSnapshots", &[]);
3996        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
3997        assert!(body.contains("snap1"));
3998        assert!(body.contains("snap2"));
3999    }
4000
4001    #[test]
4002    fn describe_db_snapshots_unknown_id_errors() {
4003        let svc = make_service();
4004        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4005        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4006    }
4007
4008    // ── Error branch tests ──
4009
4010    #[test]
4011    fn describe_db_instances_not_found() {
4012        let svc = make_service();
4013        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4014        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4015    }
4016
4017    #[tokio::test]
4018    async fn delete_db_instance_not_found() {
4019        let svc = make_service();
4020        let req = request(
4021            "DeleteDBInstance",
4022            &[
4023                ("DBInstanceIdentifier", "ghost"),
4024                ("SkipFinalSnapshot", "true"),
4025            ],
4026        );
4027        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4028    }
4029
4030    #[test]
4031    fn modify_db_instance_not_found() {
4032        let svc = make_service();
4033        let req = request(
4034            "ModifyDBInstance",
4035            &[
4036                ("DBInstanceIdentifier", "ghost"),
4037                ("AllocatedStorage", "20"),
4038            ],
4039        );
4040        // Validation fires before existence check
4041        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4042    }
4043
4044    #[tokio::test]
4045    async fn reboot_db_instance_not_found() {
4046        let svc = make_service();
4047        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4048        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4049    }
4050
4051    #[tokio::test]
4052    async fn create_db_snapshot_instance_not_found() {
4053        let svc = make_service();
4054        let req = request(
4055            "CreateDBSnapshot",
4056            &[
4057                ("DBInstanceIdentifier", "ghost"),
4058                ("DBSnapshotIdentifier", "snap1"),
4059            ],
4060        );
4061        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4062    }
4063
4064    #[tokio::test]
4065    async fn restore_db_instance_snapshot_not_found() {
4066        let svc = make_service();
4067        let req = request(
4068            "RestoreDBInstanceFromDBSnapshot",
4069            &[
4070                ("DBInstanceIdentifier", "restored"),
4071                ("DBSnapshotIdentifier", "ghost-snap"),
4072            ],
4073        );
4074        assert_code(
4075            svc.restore_db_instance_from_db_snapshot(&req).await,
4076            "InvalidParameterValue",
4077        );
4078    }
4079
4080    #[tokio::test]
4081    async fn create_db_instance_read_replica_source_not_found() {
4082        let svc = make_service();
4083        let req = request(
4084            "CreateDBInstanceReadReplica",
4085            &[
4086                ("DBInstanceIdentifier", "replica"),
4087                ("SourceDBInstanceIdentifier", "ghost"),
4088            ],
4089        );
4090        assert_code(
4091            svc.create_db_instance_read_replica(&req).await,
4092            "InvalidParameterValue",
4093        );
4094    }
4095
4096    #[test]
4097    fn describe_db_engine_versions_basic() {
4098        let svc = make_service();
4099        let req = request("DescribeDBEngineVersions", &[]);
4100        let resp = svc.describe_db_engine_versions(&req).unwrap();
4101        let body = body_of(resp);
4102        assert!(body.contains("<DBEngineVersions>"));
4103    }
4104
4105    #[test]
4106    fn describe_orderable_db_instance_options_basic() {
4107        let svc = make_service();
4108        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4109        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4110        let body = body_of(resp);
4111        assert!(body.contains("<OrderableDBInstanceOptions>"));
4112    }
4113
4114    #[test]
4115    fn describe_db_parameter_group_not_found() {
4116        let svc = make_service();
4117        let req = request(
4118            "DescribeDBParameterGroups",
4119            &[("DBParameterGroupName", "ghost")],
4120        );
4121        assert_code(
4122            svc.describe_db_parameter_groups(&req),
4123            "DBParameterGroupNotFound",
4124        );
4125    }
4126
4127    #[test]
4128    fn delete_db_parameter_group_not_found() {
4129        let svc = make_service();
4130        let req = request(
4131            "DeleteDBParameterGroup",
4132            &[("DBParameterGroupName", "ghost")],
4133        );
4134        assert_code(
4135            svc.delete_db_parameter_group(&req),
4136            "DBParameterGroupNotFound",
4137        );
4138    }
4139
4140    #[test]
4141    fn describe_db_subnet_group_not_found() {
4142        let svc = make_service();
4143        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4144        assert_code(
4145            svc.describe_db_subnet_groups(&req),
4146            "DBSubnetGroupNotFoundFault",
4147        );
4148    }
4149
4150    #[test]
4151    fn delete_db_subnet_group_not_found() {
4152        let svc = make_service();
4153        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4154        assert_code(
4155            svc.delete_db_subnet_group(&req),
4156            "DBSubnetGroupNotFoundFault",
4157        );
4158    }
4159
4160    #[test]
4161    fn add_tags_resource_not_found() {
4162        let svc = make_service();
4163        let req = request(
4164            "AddTagsToResource",
4165            &[
4166                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4167                ("Tags.member.1.Key", "k"),
4168                ("Tags.member.1.Value", "v"),
4169            ],
4170        );
4171        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4172    }
4173
4174    #[test]
4175    fn list_tags_resource_not_found() {
4176        let svc = make_service();
4177        let req = request(
4178            "ListTagsForResource",
4179            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4180        );
4181        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4182    }
4183
4184    // ── snapshot operations ──
4185
4186    #[tokio::test]
4187    async fn create_db_snapshot_missing_id_errors() {
4188        let svc = make_service();
4189        let req = request(
4190            "CreateDBSnapshot",
4191            &[("DBInstanceIdentifier", "nonexistent")],
4192        );
4193        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4194    }
4195
4196    #[tokio::test]
4197    async fn create_db_snapshot_unknown_instance_errors() {
4198        let svc = make_service();
4199        let req = request(
4200            "CreateDBSnapshot",
4201            &[
4202                ("DBSnapshotIdentifier", "snap1"),
4203                ("DBInstanceIdentifier", "ghost"),
4204            ],
4205        );
4206        assert!(svc.create_db_snapshot(&req).await.is_err());
4207    }
4208
4209    // ── delete_db_instance ──
4210
4211    #[tokio::test]
4212    async fn delete_db_instance_missing_id_errors() {
4213        let svc = make_service();
4214        let req = request("DeleteDBInstance", &[]);
4215        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4216    }
4217
4218    // ── reboot_db_instance ──
4219
4220    #[tokio::test]
4221    async fn reboot_db_instance_missing_id_errors() {
4222        let svc = make_service();
4223        let req = request("RebootDBInstance", &[]);
4224        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4225    }
4226
4227    // ── create_db_instance validation ──
4228
4229    #[tokio::test]
4230    async fn create_db_instance_missing_id_errors() {
4231        let svc = make_service();
4232        let req = request(
4233            "CreateDBInstance",
4234            &[
4235                ("Engine", "postgres"),
4236                ("DBInstanceClass", "db.t3.micro"),
4237                ("AllocatedStorage", "20"),
4238                ("MasterUsername", "admin"),
4239                ("MasterUserPassword", "secretpass"),
4240            ],
4241        );
4242        assert!(svc.create_db_instance(&req).await.is_err());
4243    }
4244
4245    #[tokio::test]
4246    async fn create_db_instance_unsupported_engine_errors() {
4247        let svc = make_service();
4248        let req = request(
4249            "CreateDBInstance",
4250            &[
4251                ("DBInstanceIdentifier", "db1"),
4252                ("Engine", "mongodb"),
4253                ("DBInstanceClass", "db.t3.micro"),
4254                ("AllocatedStorage", "20"),
4255                ("MasterUsername", "admin"),
4256                ("MasterUserPassword", "secretpass"),
4257            ],
4258        );
4259        assert!(svc.create_db_instance(&req).await.is_err());
4260    }
4261
4262    // ── restore_db_instance_from_db_snapshot ──
4263
4264    #[tokio::test]
4265    async fn restore_db_instance_missing_ids_errors() {
4266        let svc = make_service();
4267        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4268        assert!(svc
4269            .restore_db_instance_from_db_snapshot(&req)
4270            .await
4271            .is_err());
4272    }
4273
4274    #[tokio::test]
4275    async fn restore_db_instance_unknown_snapshot_errors() {
4276        let svc = make_service();
4277        let req = request(
4278            "RestoreDBInstanceFromDBSnapshot",
4279            &[
4280                ("DBInstanceIdentifier", "restored"),
4281                ("DBSnapshotIdentifier", "missing"),
4282            ],
4283        );
4284        assert!(svc
4285            .restore_db_instance_from_db_snapshot(&req)
4286            .await
4287            .is_err());
4288    }
4289
4290    // ── create_db_instance_read_replica ──
4291
4292    #[tokio::test]
4293    async fn create_read_replica_missing_source_errors() {
4294        let svc = make_service();
4295        let req = request(
4296            "CreateDBInstanceReadReplica",
4297            &[("DBInstanceIdentifier", "replica1")],
4298        );
4299        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4300    }
4301
4302    #[tokio::test]
4303    async fn create_read_replica_unknown_source_errors() {
4304        let svc = make_service();
4305        let req = request(
4306            "CreateDBInstanceReadReplica",
4307            &[
4308                ("DBInstanceIdentifier", "replica1"),
4309                ("SourceDBInstanceIdentifier", "ghost"),
4310            ],
4311        );
4312        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4313    }
4314
4315    // ── describe_db_snapshots with filters ──
4316
4317    #[test]
4318    fn describe_db_snapshots_by_snapshot_id_only() {
4319        let svc = make_service();
4320        seed_snapshot(&svc, "s1", "inst1");
4321        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4322        let resp = svc.describe_db_snapshots(&req).unwrap();
4323        let b = body_of(resp);
4324        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4325    }
4326
4327    #[test]
4328    fn describe_db_snapshots_by_instance_id_returns_matching() {
4329        let svc = make_service();
4330        seed_snapshot(&svc, "s1", "inst1");
4331        seed_snapshot(&svc, "s2", "inst2");
4332        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4333        let resp = svc.describe_db_snapshots(&req).unwrap();
4334        let b = body_of(resp);
4335        assert!(b.contains("s1"));
4336        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4337    }
4338
4339    // ── modify_db_parameter_group ──
4340
4341    #[test]
4342    fn modify_db_parameter_group_missing_name() {
4343        let svc = make_service();
4344        let req = request("ModifyDBParameterGroup", &[]);
4345        assert!(svc.modify_db_parameter_group(&req).is_err());
4346    }
4347
4348    // ── modify_db_subnet_group ──
4349
4350    #[test]
4351    fn modify_db_subnet_group_unknown_errors() {
4352        let svc = make_service();
4353        let req = request(
4354            "ModifyDBSubnetGroup",
4355            &[
4356                ("DBSubnetGroupName", "ghost"),
4357                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4358                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4359            ],
4360        );
4361        assert!(svc.modify_db_subnet_group(&req).is_err());
4362    }
4363
4364    // ── describe_db_instances ──
4365
4366    #[test]
4367    fn describe_db_instances_empty_returns_xml() {
4368        let svc = make_service();
4369        let req = request("DescribeDBInstances", &[]);
4370        let resp = svc.describe_db_instances(&req).unwrap();
4371        let b = body_of(resp);
4372        assert!(b.contains("DescribeDBInstancesResult"));
4373    }
4374
4375    #[test]
4376    fn describe_db_snapshots_empty_returns_empty_list() {
4377        let svc = make_service();
4378        let req = request("DescribeDBSnapshots", &[]);
4379        let resp = svc.describe_db_snapshots(&req).unwrap();
4380        let b = body_of(resp);
4381        assert!(b.contains("DescribeDBSnapshotsResult"));
4382    }
4383
4384    #[test]
4385    fn add_tags_unknown_resource_errors() {
4386        let svc = make_service();
4387        let req = request(
4388            "AddTagsToResource",
4389            &[
4390                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4391                ("Tags.member.1.Key", "k"),
4392                ("Tags.member.1.Value", "v"),
4393            ],
4394        );
4395        assert!(svc.add_tags_to_resource(&req).is_err());
4396    }
4397
4398    #[test]
4399    fn remove_tags_unknown_resource_errors() {
4400        let svc = make_service();
4401        let req = request(
4402            "RemoveTagsFromResource",
4403            &[
4404                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4405                ("TagKeys.member.1", "k"),
4406            ],
4407        );
4408        assert!(svc.remove_tags_from_resource(&req).is_err());
4409    }
4410
4411    #[test]
4412    fn create_db_parameter_group_missing_name_errors() {
4413        let svc = make_service();
4414        let req = request(
4415            "CreateDBParameterGroup",
4416            &[
4417                ("DBParameterGroupFamily", "postgres16"),
4418                ("Description", "d"),
4419            ],
4420        );
4421        assert!(svc.create_db_parameter_group(&req).is_err());
4422    }
4423
4424    #[test]
4425    fn create_db_subnet_group_missing_desc_errors() {
4426        let svc = make_service();
4427        let req = request(
4428            "CreateDBSubnetGroup",
4429            &[
4430                ("DBSubnetGroupName", "sg1"),
4431                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4432                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4433            ],
4434        );
4435        assert!(svc.create_db_subnet_group(&req).is_err());
4436    }
4437
4438    #[tokio::test]
4439    async fn create_db_instance_missing_class_errors() {
4440        let svc = make_service();
4441        let req = request(
4442            "CreateDBInstance",
4443            &[
4444                ("DBInstanceIdentifier", "miss-class"),
4445                ("Engine", "postgres"),
4446                ("AllocatedStorage", "20"),
4447                ("MasterUsername", "admin"),
4448                ("MasterUserPassword", "secretpass"),
4449            ],
4450        );
4451        assert!(svc.create_db_instance(&req).await.is_err());
4452    }
4453
4454    #[tokio::test]
4455    async fn create_db_instance_missing_master_username_errors() {
4456        let svc = make_service();
4457        let req = request(
4458            "CreateDBInstance",
4459            &[
4460                ("DBInstanceIdentifier", "miss-mu"),
4461                ("Engine", "postgres"),
4462                ("DBInstanceClass", "db.t3.micro"),
4463                ("AllocatedStorage", "20"),
4464                ("MasterUserPassword", "secretpass"),
4465            ],
4466        );
4467        assert!(svc.create_db_instance(&req).await.is_err());
4468    }
4469
4470    #[test]
4471    fn modify_db_instance_missing_id_errors() {
4472        let svc = make_service();
4473        let req = request("ModifyDBInstance", &[]);
4474        assert!(svc.modify_db_instance(&req).is_err());
4475    }
4476
4477    #[test]
4478    fn modify_db_parameter_group_unknown_pg_errors() {
4479        let svc = make_service();
4480        let req = request(
4481            "ModifyDBParameterGroup",
4482            &[
4483                ("DBParameterGroupName", "ghost"),
4484                ("Parameters.member.1.ParameterName", "p"),
4485                ("Parameters.member.1.ParameterValue", "v"),
4486                ("Parameters.member.1.ApplyMethod", "immediate"),
4487            ],
4488        );
4489        assert!(svc.modify_db_parameter_group(&req).is_err());
4490    }
4491
4492    #[test]
4493    fn describe_db_parameter_groups_unknown_errors() {
4494        let svc = make_service();
4495        let req = request(
4496            "DescribeDBParameterGroups",
4497            &[("DBParameterGroupName", "ghost")],
4498        );
4499        assert!(svc.describe_db_parameter_groups(&req).is_err());
4500    }
4501
4502    #[test]
4503    fn describe_db_subnet_groups_unknown_errors() {
4504        let svc = make_service();
4505        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4506        assert!(svc.describe_db_subnet_groups(&req).is_err());
4507    }
4508}