Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeSourceRegions",
125    "DescribeTenantDatabases",
126    "DescribeValidDBInstanceModifications",
127    "DisableHttpEndpoint",
128    "DownloadDBLogFilePortion",
129    "EnableHttpEndpoint",
130    "FailoverDBCluster",
131    "FailoverGlobalCluster",
132    "ListTagsForResource",
133    "ModifyActivityStream",
134    "ModifyCertificates",
135    "ModifyCurrentDBClusterCapacity",
136    "ModifyCustomDBEngineVersion",
137    "ModifyDBCluster",
138    "ModifyDBClusterEndpoint",
139    "ModifyDBClusterParameterGroup",
140    "ModifyDBClusterSnapshotAttribute",
141    "ModifyDBInstance",
142    "ModifyDBParameterGroup",
143    "ModifyDBProxy",
144    "ModifyDBProxyEndpoint",
145    "ModifyDBProxyTargetGroup",
146    "ModifyDBRecommendation",
147    "ModifyDBShardGroup",
148    "ModifyDBSnapshot",
149    "ModifyDBSnapshotAttribute",
150    "ModifyDBSubnetGroup",
151    "ModifyEventSubscription",
152    "ModifyGlobalCluster",
153    "ModifyIntegration",
154    "ModifyOptionGroup",
155    "ModifyTenantDatabase",
156    "PromoteReadReplica",
157    "PromoteReadReplicaDBCluster",
158    "PurchaseReservedDBInstancesOffering",
159    "RebootDBCluster",
160    "RebootDBInstance",
161    "RebootDBShardGroup",
162    "RegisterDBProxyTargets",
163    "RemoveFromGlobalCluster",
164    "RemoveRoleFromDBCluster",
165    "RemoveRoleFromDBInstance",
166    "RemoveSourceIdentifierFromSubscription",
167    "RemoveTagsFromResource",
168    "ResetDBClusterParameterGroup",
169    "ResetDBParameterGroup",
170    "RestoreDBClusterFromS3",
171    "RestoreDBClusterFromSnapshot",
172    "RestoreDBClusterToPointInTime",
173    "RestoreDBInstanceFromDBSnapshot",
174    "RestoreDBInstanceFromS3",
175    "RestoreDBInstanceToPointInTime",
176    "RevokeDBSecurityGroupIngress",
177    "StartActivityStream",
178    "StartDBCluster",
179    "StartDBInstance",
180    "StartDBInstanceAutomatedBackupsReplication",
181    "StartExportTask",
182    "StopActivityStream",
183    "StopDBCluster",
184    "StopDBInstance",
185    "StopDBInstanceAutomatedBackupsReplication",
186    "SwitchoverBlueGreenDeployment",
187    "SwitchoverGlobalCluster",
188    "SwitchoverReadReplica",
189];
190
191pub struct RdsService {
192    pub(crate) state: SharedRdsState,
193    runtime: Option<Arc<RdsRuntime>>,
194    snapshot_store: Option<Arc<dyn SnapshotStore>>,
195    snapshot_lock: Arc<AsyncMutex<()>>,
196    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
197}
198
199/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
200#[derive(Clone, Copy)]
201#[allow(dead_code, clippy::enum_variant_names)]
202pub(crate) enum RdsSourceType {
203    DbInstance,
204    DbSnapshot,
205    DbParameterGroup,
206    DbCluster,
207    DbClusterSnapshot,
208}
209
210impl RdsSourceType {
211    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
212    /// form AWS publishes in the `aws.rds` event detail.
213    fn as_str(self) -> &'static str {
214        match self {
215            Self::DbInstance => "DB_INSTANCE",
216            Self::DbSnapshot => "DB_SNAPSHOT",
217            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
218            Self::DbCluster => "DB_CLUSTER",
219            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
220        }
221    }
222
223    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
224    /// API spec this is the kebab-case form (`db-instance`,
225    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
226    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
227    pub(crate) fn describe_events_str(self) -> &'static str {
228        match self {
229            Self::DbInstance => "db-instance",
230            Self::DbSnapshot => "db-snapshot",
231            Self::DbParameterGroup => "db-parameter-group",
232            Self::DbCluster => "db-cluster",
233            Self::DbClusterSnapshot => "db-cluster-snapshot",
234        }
235    }
236
237    fn detail_type(self) -> &'static str {
238        match self {
239            Self::DbInstance => "RDS DB Instance Event",
240            Self::DbSnapshot => "RDS DB Snapshot Event",
241            Self::DbParameterGroup => "RDS DB Parameter Group Event",
242            Self::DbCluster => "RDS DB Cluster Event",
243            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
244        }
245    }
246}
247
248impl RdsService {
249    pub(crate) fn state_handle(&self) -> &SharedRdsState {
250        &self.state
251    }
252}
253
254impl RdsService {
255    pub fn new(state: SharedRdsState) -> Self {
256        Self {
257            state,
258            runtime: None,
259            snapshot_store: None,
260            snapshot_lock: Arc::new(AsyncMutex::new(())),
261            delivery_bus: None,
262        }
263    }
264
265    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
266        self.runtime = Some(runtime);
267        self
268    }
269
270    /// Crate-internal accessor for the optional runtime; needed by the
271    /// extras handler so cluster snapshot/restore paths can dump and
272    /// replay member databases via the live container runtime.
273    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
274        self.runtime.as_ref()
275    }
276
277    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
278        self.snapshot_store = Some(store);
279        self
280    }
281
282    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
283        self.delivery_bus = Some(bus);
284        self
285    }
286
287    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
288    /// Also records into the per-account events ring so DescribeEvents
289    /// can serve the row. No-op for the EventBridge side when the bus
290    /// isn't wired (tests, minimal configs).
291    pub(crate) fn emit_event(
292        &self,
293        source_type: RdsSourceType,
294        source_identifier: &str,
295        source_arn: &str,
296        event_id: &str,
297        event_categories: &[&str],
298        message: &str,
299    ) {
300        // Source the account_id off the source_arn (segment 4) — that's
301        // the canonical ARN form for RDS resources.
302        let account_id = source_arn.split(':').nth(4).unwrap_or("");
303        emit_event_static_with_state(
304            self.delivery_bus.as_ref(),
305            Some(&self.state),
306            if account_id.is_empty() {
307                None
308            } else {
309                Some(account_id)
310            },
311            source_type,
312            source_identifier,
313            source_arn,
314            event_id,
315            event_categories,
316            message,
317        );
318    }
319
320    async fn save_snapshot(&self) {
321        save_snapshot_static(
322            self.state.clone(),
323            self.snapshot_store.clone(),
324            self.snapshot_lock.clone(),
325        )
326        .await;
327    }
328}
329
330/// Persist the current `RdsState` to the configured snapshot store. Free
331/// function so background tasks (e.g. the create-DB-instance container-start
332/// task) can save without holding a `&RdsService`. Returns immediately when
333/// no store is configured (memory-mode runs).
334async fn save_snapshot_static(
335    state: SharedRdsState,
336    store: Option<Arc<dyn SnapshotStore>>,
337    lock: Arc<AsyncMutex<()>>,
338) {
339    let Some(store) = store else {
340        return;
341    };
342    let _guard = lock.lock().await;
343    let snapshot = RdsSnapshot {
344        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
345        state: None,
346        accounts: Some(state.read().clone()),
347    };
348    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
349        let bytes = serde_json::to_vec(&snapshot)
350            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
351        store.save(&bytes)
352    })
353    .await;
354    match join {
355        Ok(Ok(())) => {}
356        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
357        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
358    }
359}
360
361impl RdsService {
362    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
363    ///
364    /// RDS operations that start, stop, or reach into a database container fail
365    /// with a consistent wire error when the daemon (Docker/Podman) is missing
366    /// rather than each caller restating the message.
367    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
368        self.runtime.as_ref().ok_or_else(|| {
369            AwsServiceError::aws_error(
370                StatusCode::SERVICE_UNAVAILABLE,
371                "InvalidParameterValue",
372                "Docker/Podman is required for RDS DB instances but is not available",
373            )
374        })
375    }
376}
377
378#[async_trait]
379impl AwsService for RdsService {
380    fn service_name(&self) -> &str {
381        "rds"
382    }
383
384    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let mutates = is_mutating_action(request.action.as_str());
386        let result = match request.action.as_str() {
387            "AddTagsToResource" => self.add_tags_to_resource(&request),
388            "CreateDBInstance" => self.create_db_instance(&request).await,
389            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
390            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
391            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
392            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
393            "DeleteDBInstance" => self.delete_db_instance(&request).await,
394            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
395            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
396            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
397            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
398            "DescribeDBInstances" => self.describe_db_instances(&request),
399            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
400            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
401            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
402            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
403            "DescribeOrderableDBInstanceOptions" => {
404                self.describe_orderable_db_instance_options(&request)
405            }
406            "ListTagsForResource" => self.list_tags_for_resource(&request),
407            "ModifyDBInstance" => self.modify_db_instance(&request),
408            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
409            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
410            "RebootDBInstance" => self.reboot_db_instance(&request).await,
411            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
412            "RestoreDBInstanceFromDBSnapshot" => {
413                self.restore_db_instance_from_db_snapshot(&request).await
414            }
415            "RestoreDBInstanceToPointInTime" => {
416                self.restore_db_instance_to_point_in_time(&request).await
417            }
418            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
419            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
420            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
421            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
422            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
423            "RestoreDBClusterToPointInTime" => {
424                self.restore_db_cluster_to_point_in_time(&request).await
425            }
426            _ => self.handle_extra_action(&request),
427        };
428        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
429            self.save_snapshot().await;
430        }
431        result
432    }
433
434    fn supported_actions(&self) -> &[&str] {
435        SUPPORTED_ACTIONS
436    }
437}
438
439impl RdsService {
440    async fn create_db_instance(
441        &self,
442        request: &AwsRequest,
443    ) -> Result<AwsResponse, AwsServiceError> {
444        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
445        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
446        let db_instance_class = required_query_param(request, "DBInstanceClass")?;
447        let engine = required_query_param(request, "Engine")?;
448        let master_username = required_query_param(request, "MasterUsername")?;
449        let master_user_password = required_query_param(request, "MasterUserPassword")?;
450        let db_name = optional_query_param(request, "DBName");
451        let engine_version =
452            optional_query_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
453        let publicly_accessible =
454            parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?
455                .unwrap_or(true);
456        let deletion_protection =
457            parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?
458                .unwrap_or(false);
459        let port = optional_i32_param(request, "Port")?
460            .unwrap_or_else(|| default_port_for_engine(&engine));
461        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
462
463        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName")
464            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
465
466        let backup_retention_period =
467            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
468        let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow")
469            .unwrap_or_else(|| "03:00-04:00".to_string());
470        let option_group_name = optional_query_param(request, "OptionGroupName");
471        let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?
472            .unwrap_or(false);
473        let availability_zone = optional_query_param(request, "AvailabilityZone");
474        let storage_type = optional_query_param(request, "StorageType");
475        let storage_encrypted =
476            parse_optional_bool(optional_query_param(request, "StorageEncrypted").as_deref())?
477                .unwrap_or(false);
478        let kms_key_id = optional_query_param(request, "KmsKeyId");
479        let iam_database_authentication_enabled = parse_optional_bool(
480            optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
481        )?
482        .unwrap_or(false);
483        let iops = optional_i32_param(request, "Iops")?;
484        let monitoring_interval = optional_i32_param(request, "MonitoringInterval")?;
485        let monitoring_role_arn = optional_query_param(request, "MonitoringRoleArn");
486        let performance_insights_enabled = parse_optional_bool(
487            optional_query_param(request, "EnablePerformanceInsights").as_deref(),
488        )?
489        .unwrap_or(false);
490        let performance_insights_kms_key_id =
491            optional_query_param(request, "PerformanceInsightsKMSKeyId");
492        let performance_insights_retention_period =
493            optional_i32_param(request, "PerformanceInsightsRetentionPeriod")?;
494        let enabled_cloudwatch_logs_exports =
495            parse_cloudwatch_logs_exports(request, "EnableCloudwatchLogsExports");
496        let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
497        let network_type = optional_query_param(request, "NetworkType");
498        let character_set_name = optional_query_param(request, "CharacterSetName");
499        let auto_minor_version_upgrade = parse_optional_bool(
500            optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
501        )?;
502        let copy_tags_to_snapshot =
503            parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
504        let db_cluster_identifier = optional_query_param(request, "DBClusterIdentifier");
505
506        validate_create_request(
507            &db_instance_identifier,
508            allocated_storage,
509            &db_instance_class,
510            &engine,
511            &engine_version,
512            port,
513        )?;
514
515        {
516            let mut accounts = self.state.write();
517            let state = accounts.get_or_create(&request.account_id);
518            if !state.begin_instance_creation(&db_instance_identifier) {
519                return Err(AwsServiceError::aws_error(
520                    StatusCode::BAD_REQUEST,
521                    "DBInstanceAlreadyExists",
522                    format!("DBInstance {} already exists.", db_instance_identifier),
523                ));
524            }
525            // Validate parameter group exists if specified by the caller
526            if let Some(ref pg_name) = db_parameter_group_name {
527                if !state.parameter_groups.contains_key(pg_name) {
528                    state.cancel_instance_creation(&db_instance_identifier);
529                    return Err(AwsServiceError::aws_error(
530                        StatusCode::NOT_FOUND,
531                        "DBParameterGroupNotFound",
532                        format!("DBParameterGroup {} not found.", pg_name),
533                    ));
534                }
535            }
536        }
537
538        let runtime = self.require_runtime()?.clone();
539
540        let logical_db_name = db_name
541            .clone()
542            .unwrap_or_else(|| default_db_name(&engine).to_string());
543
544        // Insert a "creating" placeholder synchronously and spawn the
545        // container start in a background task. CreateDBInstance returns
546        // ~immediately; DescribeDBInstances flips to "available" (or
547        // "failed") when the container is up. Matches AWS RDS behavior:
548        // CreateDBInstance never blocks on the container coming up.
549        let created_at = Utc::now();
550        let instance = {
551            let mut accounts = self.state.write();
552            let state = accounts.get_or_create(&request.account_id);
553            let placeholder = DbInstance {
554                db_instance_identifier: db_instance_identifier.clone(),
555                db_instance_arn: state.db_instance_arn(&db_instance_identifier),
556                db_instance_class: db_instance_class.clone(),
557                engine: engine.clone(),
558                engine_version: engine_version.clone(),
559                db_instance_status: "creating".to_string(),
560                master_username: master_username.clone(),
561                db_name: db_name.clone(),
562                endpoint_address: String::new(),
563                port: 0,
564                allocated_storage,
565                publicly_accessible,
566                deletion_protection,
567                created_at,
568                dbi_resource_id: state.next_dbi_resource_id(),
569                master_user_password: master_user_password.clone(),
570                container_id: String::new(),
571                host_port: 0,
572                tags: Vec::new(),
573                read_replica_source_db_instance_identifier: None,
574                read_replica_db_instance_identifiers: Vec::new(),
575                vpc_security_group_ids,
576                db_parameter_group_name,
577                backup_retention_period,
578                preferred_backup_window,
579                preferred_maintenance_window: None,
580                latest_restorable_time: if backup_retention_period > 0 {
581                    Some(created_at)
582                } else {
583                    None
584                },
585                option_group_name,
586                multi_az,
587                pending_modified_values: None,
588                availability_zone,
589                storage_type,
590                storage_encrypted,
591                kms_key_id,
592                iam_database_authentication_enabled,
593                iops,
594                monitoring_interval,
595                monitoring_role_arn,
596                performance_insights_enabled,
597                performance_insights_kms_key_id,
598                performance_insights_retention_period,
599                enabled_cloudwatch_logs_exports,
600                ca_certificate_identifier,
601                network_type,
602                character_set_name,
603                auto_minor_version_upgrade,
604                copy_tags_to_snapshot,
605                master_user_secret_arn: None,
606                master_user_secret_kms_key_id: None,
607                license_model: None,
608                max_allocated_storage: None,
609                multi_tenant: None,
610                storage_throughput: None,
611                tde_credential_arn: None,
612                delete_automated_backups: None,
613                db_security_groups: Vec::new(),
614                domain: None,
615                domain_fqdn: None,
616                domain_ou: None,
617                domain_iam_role_name: None,
618                domain_auth_secret_arn: None,
619                domain_dns_ips: Vec::new(),
620                db_cluster_identifier: db_cluster_identifier.clone(),
621            };
622            state.finish_instance_creation(placeholder.clone());
623            placeholder
624        };
625        let instance_arn = instance.db_instance_arn.clone();
626
627        self.emit_event(
628            RdsSourceType::DbInstance,
629            &db_instance_identifier,
630            &instance_arn,
631            "RDS-EVENT-0005",
632            &["creation"],
633            "DB instance created",
634        );
635
636        {
637            let state_handle = self.state.clone();
638            let delivery_bus = self.delivery_bus.clone();
639            let runtime = runtime.clone();
640            let id = db_instance_identifier.clone();
641            let engine = engine.clone();
642            let engine_version = engine_version.clone();
643            let master_username = master_username.clone();
644            let master_user_password = master_user_password.clone();
645            let logical_db_name_task = logical_db_name.clone();
646            let account_id = request.account_id.clone();
647            let region = request.region.clone();
648            let arn = instance_arn.clone();
649            let snapshot_store = self.snapshot_store.clone();
650            let snapshot_lock = self.snapshot_lock.clone();
651            let cluster_id_for_attach = db_cluster_identifier.clone();
652            tokio::spawn(async move {
653                match runtime
654                    .ensure_postgres(
655                        &id,
656                        &engine,
657                        &engine_version,
658                        &master_username,
659                        &master_user_password,
660                        &logical_db_name_task,
661                        &account_id,
662                        &region,
663                    )
664                    .await
665                {
666                    Ok(running) => {
667                        // If the cluster has a pending restore dump
668                        // staged by RestoreDBClusterFromSnapshot /
669                        // RestoreDBClusterToPointInTime, drain it and
670                        // replay onto this fresh instance. We do this
671                        // before flipping status to available so the
672                        // first read of "available" sees restored data.
673                        let pending_dump = if let Some(ref cid) = cluster_id_for_attach {
674                            let mut accounts = state_handle.write();
675                            let state = accounts.get_or_create(&account_id);
676                            state
677                                .extras
678                                .get_mut("clusters")
679                                .and_then(|m| m.get_mut(cid))
680                                .and_then(|entry| entry.as_object_mut())
681                                .and_then(|obj| obj.remove("PendingRestoreDumpB64"))
682                                .and_then(|v| v.as_str().map(str::to_string))
683                                .and_then(|b64| {
684                                    use base64::Engine;
685                                    base64::engine::general_purpose::STANDARD
686                                        .decode(b64.as_bytes())
687                                        .ok()
688                                })
689                        } else {
690                            None
691                        };
692                        if let Some(dump) = pending_dump {
693                            if let Err(error) = runtime
694                                .restore_database(
695                                    &id,
696                                    &engine,
697                                    &master_username,
698                                    &master_user_password,
699                                    &logical_db_name_task,
700                                    &dump,
701                                )
702                                .await
703                            {
704                                tracing::error!(%error, db_instance_identifier=%id, "cluster restore dump replay failed");
705                            }
706                        }
707
708                        {
709                            let mut accounts = state_handle.write();
710                            let state = accounts.get_or_create(&account_id);
711                            if let Some(inst) = state.instances.get_mut(&id) {
712                                inst.db_instance_status = "available".to_string();
713                                inst.endpoint_address = "127.0.0.1".to_string();
714                                inst.port = i32::from(running.host_port);
715                                inst.host_port = running.host_port;
716                                inst.container_id = running.container_id;
717                            }
718                            // Register as cluster member so failover /
719                            // restore paths can find the writer.
720                            if let Some(ref cid) = cluster_id_for_attach {
721                                attach_cluster_member(state, cid, &id);
722                            }
723                        }
724                        // Persist the flipped status. Without this the
725                        // synchronous CreateDBInstance save captures the
726                        // `creating` placeholder, which the load path
727                        // discards on restart, dropping the instance.
728                        save_snapshot_static(
729                            state_handle.clone(),
730                            snapshot_store.clone(),
731                            snapshot_lock.clone(),
732                        )
733                        .await;
734                    }
735                    Err(error) => {
736                        tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
737                        {
738                            let mut accounts = state_handle.write();
739                            let state = accounts.get_or_create(&account_id);
740                            state.instances.remove(&id);
741                        }
742                        save_snapshot_static(
743                            state_handle.clone(),
744                            snapshot_store.clone(),
745                            snapshot_lock.clone(),
746                        )
747                        .await;
748                        emit_event_static(
749                            delivery_bus.as_ref(),
750                            RdsSourceType::DbInstance,
751                            &id,
752                            &arn,
753                            "RDS-EVENT-0058",
754                            &["failure"],
755                            &format!("DB instance failed to create: {}", error),
756                        );
757                    }
758                }
759            });
760        }
761
762        Ok(AwsResponse::xml(
763            StatusCode::OK,
764            query_response_xml(
765                "CreateDBInstance",
766                RDS_NS,
767                &format!(
768                    "<DBInstance>{}</DBInstance>",
769                    db_instance_xml(&instance, None)
770                ),
771                &request.request_id,
772            ),
773        ))
774    }
775
776    async fn delete_db_instance(
777        &self,
778        request: &AwsRequest,
779    ) -> Result<AwsResponse, AwsServiceError> {
780        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
781        let skip_final_snapshot =
782            parse_optional_bool(optional_query_param(request, "SkipFinalSnapshot").as_deref())?
783                .unwrap_or(false);
784        let final_db_snapshot_identifier =
785            optional_query_param(request, "FinalDBSnapshotIdentifier");
786
787        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
788            return Err(AwsServiceError::aws_error(
789                StatusCode::BAD_REQUEST,
790                "InvalidParameterCombination",
791                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
792            ));
793        }
794        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
795            return Err(AwsServiceError::aws_error(
796                StatusCode::BAD_REQUEST,
797                "InvalidParameterCombination",
798                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
799            ));
800        }
801
802        // Check deletion protection BEFORE creating snapshot or making any changes
803        {
804            let accounts = self.state.read();
805            let empty = RdsState::new(&request.account_id, &request.region);
806            let state = accounts.get(&request.account_id).unwrap_or(&empty);
807            if let Some(instance) = state.instances.get(&db_instance_identifier) {
808                if instance.deletion_protection {
809                    return Err(AwsServiceError::aws_error(
810                        StatusCode::BAD_REQUEST,
811                        "InvalidDBInstanceState",
812                        format!(
813                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
814                            db_instance_identifier
815                        ),
816                    ));
817                }
818            } else {
819                return Err(db_instance_not_found(&db_instance_identifier));
820            }
821        }
822
823        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
824            self.create_final_db_snapshot(
825                &db_instance_identifier,
826                snapshot_id,
827                &request.account_id,
828                &request.region,
829            )
830            .await?;
831        }
832
833        let instance = {
834            let mut accounts = self.state.write();
835            let state = accounts.get_or_create(&request.account_id);
836            let instance = state
837                .instances
838                .remove(&db_instance_identifier)
839                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
840
841            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
842                if let Some(source) = state.instances.get_mut(source_id) {
843                    source
844                        .read_replica_db_instance_identifiers
845                        .retain(|id| id != &db_instance_identifier);
846                }
847            }
848
849            for replica_id in &instance.read_replica_db_instance_identifiers {
850                if let Some(replica) = state.instances.get_mut(replica_id) {
851                    replica.read_replica_source_db_instance_identifier = None;
852                }
853            }
854
855            instance
856        };
857
858        if let Some(runtime) = &self.runtime {
859            runtime.stop_container(&db_instance_identifier).await;
860        }
861
862        self.emit_event(
863            RdsSourceType::DbInstance,
864            &db_instance_identifier,
865            &instance.db_instance_arn,
866            "RDS-EVENT-0003",
867            &["deletion"],
868            "DB instance deleted",
869        );
870
871        Ok(AwsResponse::xml(
872            StatusCode::OK,
873            query_response_xml(
874                "DeleteDBInstance",
875                RDS_NS,
876                &format!(
877                    "<DBInstance>{}</DBInstance>",
878                    db_instance_xml(&instance, Some("deleting"))
879                ),
880                &request.request_id,
881            ),
882        ))
883    }
884
885    /// Take a final snapshot of an instance that is about to be deleted,
886    /// persisting the dumped database into `state.snapshots`. The DLQ-style
887    /// conflict check runs twice — once under the read lock before paying
888    /// for the dump, once under the write lock before committing — to keep
889    /// concurrent deletes from colliding.
890    async fn create_final_db_snapshot(
891        &self,
892        db_instance_identifier: &str,
893        snapshot_id: &str,
894        account_id: &str,
895        region: &str,
896    ) -> Result<(), AwsServiceError> {
897        let runtime = self.runtime.as_ref().ok_or_else(|| {
898            AwsServiceError::aws_error(
899                StatusCode::SERVICE_UNAVAILABLE,
900                "InvalidParameterValue",
901                "Docker/Podman is required for RDS snapshots but is not available",
902            )
903        })?;
904
905        let (instance_for_snapshot, db_name) = {
906            let accounts = self.state.read();
907            let empty = RdsState::new(account_id, region);
908            let state = accounts.get(account_id).unwrap_or(&empty);
909
910            if state.snapshots.contains_key(snapshot_id) {
911                return Err(AwsServiceError::aws_error(
912                    StatusCode::CONFLICT,
913                    "DBSnapshotAlreadyExists",
914                    format!("DBSnapshot {snapshot_id} already exists."),
915                ));
916            }
917
918            let instance = state
919                .instances
920                .get(db_instance_identifier)
921                .cloned()
922                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
923
924            let default_db = default_db_name(&instance.engine);
925            let db_name = instance
926                .db_name
927                .as_deref()
928                .unwrap_or(default_db)
929                .to_string();
930
931            (instance, db_name)
932        };
933
934        let dump_data = runtime
935            .dump_database(
936                db_instance_identifier,
937                &instance_for_snapshot.engine,
938                &instance_for_snapshot.master_username,
939                &instance_for_snapshot.master_user_password,
940                &db_name,
941            )
942            .await
943            .map_err(runtime_error_to_service_error)?;
944
945        let mut accounts = self.state.write();
946        let state = accounts.get_or_create(account_id);
947
948        if state.snapshots.contains_key(snapshot_id) {
949            return Err(AwsServiceError::aws_error(
950                StatusCode::CONFLICT,
951                "DBSnapshotAlreadyExists",
952                format!("DBSnapshot {snapshot_id} already exists."),
953            ));
954        }
955
956        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
957
958        let snapshot = DbSnapshot {
959            db_snapshot_identifier: snapshot_id.to_string(),
960            db_snapshot_arn: snapshot_arn,
961            db_instance_identifier: db_instance_identifier.to_string(),
962            snapshot_create_time: Utc::now(),
963            engine: instance_for_snapshot.engine.clone(),
964            engine_version: instance_for_snapshot.engine_version.clone(),
965            allocated_storage: instance_for_snapshot.allocated_storage,
966            status: "available".to_string(),
967            port: instance_for_snapshot.port,
968            master_username: instance_for_snapshot.master_username.clone(),
969            db_name: instance_for_snapshot.db_name.clone(),
970            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
971            snapshot_type: "automated".to_string(),
972            master_user_password: instance_for_snapshot.master_user_password.clone(),
973            tags: Vec::new(),
974            dump_data,
975            availability_zone: instance_for_snapshot.availability_zone.clone(),
976            vpc_id: None,
977            instance_create_time: Some(instance_for_snapshot.created_at),
978            license_model: Some(
979                service_helpers::license_model_for_engine(&instance_for_snapshot.engine)
980                    .to_string(),
981            ),
982            iops: instance_for_snapshot.iops,
983            option_group_name: instance_for_snapshot.option_group_name.clone(),
984            percent_progress: Some(100),
985            storage_type: instance_for_snapshot.storage_type.clone(),
986            encrypted: instance_for_snapshot.storage_encrypted,
987            kms_key_id: instance_for_snapshot.kms_key_id.clone(),
988            iam_database_authentication_enabled: instance_for_snapshot
989                .iam_database_authentication_enabled,
990            timezone: None,
991            storage_throughput: None,
992        };
993
994        state.snapshots.insert(snapshot_id.to_string(), snapshot);
995        Ok(())
996    }
997
998    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1000        let apply_immediately =
1001            parse_optional_bool(optional_query_param(request, "ApplyImmediately").as_deref())?;
1002
1003        // Parse every Modify input up front; routing into the always-
1004        // immediate or ApplyImmediately-gated path happens further down.
1005        let deletion_protection =
1006            parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?;
1007        let backup_retention_period =
1008            parse_optional_i32(optional_query_param(request, "BackupRetentionPeriod").as_deref())?;
1009        let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow");
1010        let preferred_maintenance_window =
1011            optional_query_param(request, "PreferredMaintenanceWindow");
1012        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
1013        let master_user_secret_kms_key_id =
1014            optional_query_param(request, "MasterUserSecretKmsKeyId");
1015        let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
1016        let monitoring_interval =
1017            parse_optional_i32(optional_query_param(request, "MonitoringInterval").as_deref())?;
1018        let option_group_name = optional_query_param(request, "OptionGroupName");
1019        let auto_minor_version_upgrade = parse_optional_bool(
1020            optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
1021        )?;
1022        let copy_tags_to_snapshot =
1023            parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
1024        let delete_automated_backups = parse_optional_bool(
1025            optional_query_param(request, "DeleteAutomatedBackups").as_deref(),
1026        )?;
1027        let enable_iam_db_auth = parse_optional_bool(
1028            optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
1029        )?;
1030        let max_allocated_storage =
1031            parse_optional_i32(optional_query_param(request, "MaxAllocatedStorage").as_deref())?;
1032        let network_type = optional_query_param(request, "NetworkType");
1033        let domain = optional_query_param(request, "Domain");
1034        let domain_fqdn = optional_query_param(request, "DomainFqdn");
1035        let domain_ou = optional_query_param(request, "DomainOu");
1036        let domain_iam_role_name = optional_query_param(request, "DomainIAMRoleName");
1037        let domain_auth_secret_arn = optional_query_param(request, "DomainAuthSecretArn");
1038        let domain_dns_ips = {
1039            let v = parse_string_member_list(request, "DomainDnsIps");
1040            if v.is_empty() {
1041                None
1042            } else {
1043                Some(v)
1044            }
1045        };
1046        let disable_domain =
1047            parse_optional_bool(optional_query_param(request, "DisableDomain").as_deref())?;
1048        let rotate_master_user_password = parse_optional_bool(
1049            optional_query_param(request, "RotateMasterUserPassword").as_deref(),
1050        )?;
1051
1052        let db_instance_class = optional_query_param(request, "DBInstanceClass");
1053        let master_user_password = optional_query_param(request, "MasterUserPassword");
1054        let engine_version = optional_query_param(request, "EngineVersion");
1055        let allocated_storage =
1056            parse_optional_i32(optional_query_param(request, "AllocatedStorage").as_deref())?;
1057        let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?;
1058        let iops = parse_optional_i32(optional_query_param(request, "Iops").as_deref())?;
1059        let storage_type = optional_query_param(request, "StorageType");
1060        let storage_throughput =
1061            parse_optional_i32(optional_query_param(request, "StorageThroughput").as_deref())?;
1062        let performance_insights_enabled = parse_optional_bool(
1063            optional_query_param(request, "EnablePerformanceInsights").as_deref(),
1064        )?;
1065        let license_model = optional_query_param(request, "LicenseModel");
1066        let multi_tenant =
1067            parse_optional_bool(optional_query_param(request, "MultiTenant").as_deref())?;
1068        let publicly_accessible =
1069            parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?;
1070        let tde_credential_arn = optional_query_param(request, "TdeCredentialArn");
1071        let db_port_number =
1072            parse_optional_i32(optional_query_param(request, "DBPortNumber").as_deref())?;
1073
1074        // CloudWatch logs exports — AWS lets callers both opt-in to and
1075        // opt-out of specific log types in the same call. We compute the
1076        // resulting set per AWS semantics: start from current, remove
1077        // DisableLogTypes, then union with EnableLogTypes.
1078        let cloudwatch_enable = collect_cloudwatch_log_types(request, "EnableLogTypes");
1079        let cloudwatch_disable = collect_cloudwatch_log_types(request, "DisableLogTypes");
1080        let cloudwatch_changed = !cloudwatch_enable.is_empty() || !cloudwatch_disable.is_empty();
1081
1082        // Parse VPC security group IDs - only if at least one is provided
1083        let vpc_security_group_ids = {
1084            let mut ids = Vec::new();
1085            for index in 1.. {
1086                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1087                match optional_query_param(request, &sg_id_name) {
1088                    Some(sg_id) => ids.push(sg_id),
1089                    None => break,
1090                }
1091            }
1092            if ids.is_empty() {
1093                None
1094            } else {
1095                Some(ids)
1096            }
1097        };
1098
1099        // Legacy classic-only DBSecurityGroups list. AWS still accepts the
1100        // parameter even on VPC instances; we record it verbatim.
1101        let db_security_groups = {
1102            let mut ids = Vec::new();
1103            for index in 1.. {
1104                let key = format!("DBSecurityGroups.DBSecurityGroupName.{index}");
1105                match optional_query_param(request, &key) {
1106                    Some(name) => ids.push(name),
1107                    None => break,
1108                }
1109            }
1110            if ids.is_empty() {
1111                None
1112            } else {
1113                Some(ids)
1114            }
1115        };
1116
1117        if let Some(ref class) = db_instance_class {
1118            validate_db_instance_class(class)?;
1119        }
1120
1121        // At-least-one mutable field must be present. We accept every
1122        // mutable RDS Modify input, so we only reject the trivial case
1123        // where the caller supplied just `DBInstanceIdentifier`.
1124        let any_mutable_field = db_instance_class.is_some()
1125            || deletion_protection.is_some()
1126            || vpc_security_group_ids.is_some()
1127            || db_security_groups.is_some()
1128            || master_user_password.is_some()
1129            || backup_retention_period.is_some()
1130            || preferred_backup_window.is_some()
1131            || preferred_maintenance_window.is_some()
1132            || engine_version.is_some()
1133            || allocated_storage.is_some()
1134            || db_parameter_group_name.is_some()
1135            || multi_az.is_some()
1136            || iops.is_some()
1137            || storage_type.is_some()
1138            || storage_throughput.is_some()
1139            || master_user_secret_kms_key_id.is_some()
1140            || ca_certificate_identifier.is_some()
1141            || monitoring_interval.is_some()
1142            || performance_insights_enabled.is_some()
1143            || cloudwatch_changed
1144            || option_group_name.is_some()
1145            || auto_minor_version_upgrade.is_some()
1146            || copy_tags_to_snapshot.is_some()
1147            || delete_automated_backups.is_some()
1148            || enable_iam_db_auth.is_some()
1149            || max_allocated_storage.is_some()
1150            || network_type.is_some()
1151            || license_model.is_some()
1152            || multi_tenant.is_some()
1153            || publicly_accessible.is_some()
1154            || tde_credential_arn.is_some()
1155            || db_port_number.is_some()
1156            || domain.is_some()
1157            || domain_fqdn.is_some()
1158            || domain_ou.is_some()
1159            || domain_iam_role_name.is_some()
1160            || domain_auth_secret_arn.is_some()
1161            || domain_dns_ips.is_some()
1162            || disable_domain.is_some()
1163            || rotate_master_user_password.is_some();
1164        if !any_mutable_field {
1165            return Err(AwsServiceError::aws_error(
1166                StatusCode::BAD_REQUEST,
1167                "InvalidParameterCombination",
1168                "At least one mutable field must be provided.",
1169            ));
1170        }
1171
1172        let mut accounts = self.state.write();
1173        let state = accounts.get_or_create(&request.account_id);
1174        let instance = state
1175            .instances
1176            .get_mut(&db_instance_identifier)
1177            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1178
1179        // ── Always-immediate fields (ApplyImmediately ignored) ──
1180        if let Some(deletion_protection) = deletion_protection {
1181            instance.deletion_protection = deletion_protection;
1182        }
1183        if let Some(security_group_ids) = vpc_security_group_ids {
1184            instance.vpc_security_group_ids = security_group_ids;
1185        }
1186        if let Some(sg_names) = db_security_groups {
1187            instance.db_security_groups = sg_names;
1188        }
1189        if let Some(ca_id) = ca_certificate_identifier {
1190            instance.ca_certificate_identifier = Some(ca_id);
1191        }
1192        if let Some(kms_key) = master_user_secret_kms_key_id {
1193            instance.master_user_secret_kms_key_id = Some(kms_key);
1194        }
1195        if let Some(name) = option_group_name {
1196            instance.option_group_name = Some(name);
1197        }
1198        if let Some(b) = auto_minor_version_upgrade {
1199            instance.auto_minor_version_upgrade = Some(b);
1200        }
1201        if let Some(b) = copy_tags_to_snapshot {
1202            instance.copy_tags_to_snapshot = Some(b);
1203        }
1204        if let Some(b) = delete_automated_backups {
1205            instance.delete_automated_backups = Some(b);
1206        }
1207        if let Some(b) = enable_iam_db_auth {
1208            instance.iam_database_authentication_enabled = b;
1209        }
1210        if let Some(n) = max_allocated_storage {
1211            instance.max_allocated_storage = Some(n);
1212        }
1213        if let Some(nt) = network_type {
1214            instance.network_type = Some(nt);
1215        }
1216        if disable_domain == Some(true) {
1217            instance.domain = None;
1218            instance.domain_fqdn = None;
1219            instance.domain_ou = None;
1220            instance.domain_iam_role_name = None;
1221            instance.domain_auth_secret_arn = None;
1222            instance.domain_dns_ips.clear();
1223        } else {
1224            if let Some(v) = domain {
1225                instance.domain = Some(v);
1226            }
1227            if let Some(v) = domain_fqdn {
1228                instance.domain_fqdn = Some(v);
1229            }
1230            if let Some(v) = domain_ou {
1231                instance.domain_ou = Some(v);
1232            }
1233            if let Some(v) = domain_iam_role_name {
1234                instance.domain_iam_role_name = Some(v);
1235            }
1236            if let Some(v) = domain_auth_secret_arn {
1237                instance.domain_auth_secret_arn = Some(v);
1238            }
1239            if let Some(v) = domain_dns_ips {
1240                instance.domain_dns_ips = v;
1241            }
1242        }
1243        if cloudwatch_changed {
1244            let mut current: Vec<String> = instance.enabled_cloudwatch_logs_exports.clone();
1245            current.retain(|t| !cloudwatch_disable.contains(t));
1246            for t in &cloudwatch_enable {
1247                if !current.contains(t) {
1248                    current.push(t.clone());
1249                }
1250            }
1251            instance.enabled_cloudwatch_logs_exports = current;
1252        }
1253        // RotateMasterUserPassword: AWS rotates the secret in place. We
1254        // record a marker by bumping a synthetic password — callers don't
1255        // see plaintext, only that the secret status remains active.
1256        if rotate_master_user_password == Some(true) {
1257            instance.master_user_password = format!("rotated-{}", uuid::Uuid::new_v4().simple());
1258        }
1259
1260        // ── ApplyImmediately-gated fields ────────────────────────
1261        let immediate = apply_immediately != Some(false);
1262        if immediate {
1263            if let Some(class) = db_instance_class {
1264                instance.db_instance_class = class;
1265            }
1266            if let Some(pwd) = master_user_password {
1267                instance.master_user_password = pwd;
1268            }
1269            if let Some(version) = engine_version {
1270                instance.engine_version = version;
1271            }
1272            if let Some(storage) = allocated_storage {
1273                instance.allocated_storage = storage;
1274            }
1275            if let Some(name) = db_parameter_group_name {
1276                instance.db_parameter_group_name = Some(name);
1277            }
1278            if let Some(az) = multi_az {
1279                instance.multi_az = az;
1280            }
1281            if let Some(iops_val) = iops {
1282                instance.iops = Some(iops_val);
1283            }
1284            if let Some(stype) = storage_type {
1285                instance.storage_type = Some(stype);
1286            }
1287            if let Some(t) = storage_throughput {
1288                instance.storage_throughput = Some(t);
1289            }
1290            if let Some(pi) = performance_insights_enabled {
1291                instance.performance_insights_enabled = pi;
1292            }
1293            if let Some(lm) = license_model {
1294                instance.license_model = Some(lm);
1295            }
1296            if let Some(b) = multi_tenant {
1297                instance.multi_tenant = Some(b);
1298            }
1299            if let Some(b) = publicly_accessible {
1300                instance.publicly_accessible = b;
1301            }
1302            if let Some(arn) = tde_credential_arn {
1303                instance.tde_credential_arn = Some(arn);
1304            }
1305            if let Some(p) = db_port_number {
1306                instance.port = p;
1307            }
1308            if let Some(retention) = backup_retention_period {
1309                instance.backup_retention_period = retention;
1310            }
1311            if let Some(window) = preferred_backup_window {
1312                instance.preferred_backup_window = window;
1313            }
1314            if let Some(window) = preferred_maintenance_window {
1315                instance.preferred_maintenance_window = Some(window);
1316            }
1317            if let Some(interval) = monitoring_interval {
1318                instance.monitoring_interval = Some(interval);
1319            }
1320        } else {
1321            let any_deferred = db_instance_class.is_some()
1322                || master_user_password.is_some()
1323                || engine_version.is_some()
1324                || allocated_storage.is_some()
1325                || db_parameter_group_name.is_some()
1326                || multi_az.is_some()
1327                || iops.is_some()
1328                || storage_type.is_some()
1329                || storage_throughput.is_some()
1330                || performance_insights_enabled.is_some()
1331                || license_model.is_some()
1332                || multi_tenant.is_some()
1333                || publicly_accessible.is_some()
1334                || tde_credential_arn.is_some()
1335                || db_port_number.is_some()
1336                || backup_retention_period.is_some()
1337                || preferred_backup_window.is_some()
1338                || preferred_maintenance_window.is_some()
1339                || monitoring_interval.is_some();
1340            if any_deferred {
1341                let pending = instance
1342                    .pending_modified_values
1343                    .get_or_insert(Default::default());
1344                if let Some(class) = db_instance_class {
1345                    pending.db_instance_class = Some(class);
1346                }
1347                if let Some(pwd) = master_user_password {
1348                    pending.master_user_password = Some(pwd);
1349                }
1350                if let Some(version) = engine_version {
1351                    pending.engine_version = Some(version);
1352                }
1353                if let Some(storage) = allocated_storage {
1354                    pending.allocated_storage = Some(storage);
1355                }
1356                if let Some(name) = db_parameter_group_name {
1357                    pending.db_parameter_group_name = Some(name);
1358                }
1359                if let Some(az) = multi_az {
1360                    pending.multi_az = Some(az);
1361                }
1362                if let Some(iops_val) = iops {
1363                    pending.iops = Some(iops_val);
1364                }
1365                if let Some(stype) = storage_type {
1366                    pending.storage_type = Some(stype);
1367                }
1368                if let Some(t) = storage_throughput {
1369                    pending.storage_throughput = Some(t);
1370                }
1371                if let Some(pi) = performance_insights_enabled {
1372                    pending.performance_insights_enabled = Some(pi);
1373                }
1374                if let Some(lm) = license_model {
1375                    pending.license_model = Some(lm);
1376                }
1377                if let Some(b) = multi_tenant {
1378                    pending.multi_tenant = Some(b);
1379                }
1380                if let Some(b) = publicly_accessible {
1381                    pending.publicly_accessible = Some(b);
1382                }
1383                if let Some(arn) = tde_credential_arn {
1384                    pending.tde_credential_arn = Some(arn);
1385                }
1386                if let Some(p) = db_port_number {
1387                    pending.port = Some(p);
1388                }
1389                if let Some(retention) = backup_retention_period {
1390                    pending.backup_retention_period = Some(retention);
1391                }
1392                if let Some(window) = preferred_backup_window {
1393                    pending.preferred_backup_window = Some(window);
1394                }
1395                if let Some(window) = preferred_maintenance_window {
1396                    pending.preferred_maintenance_window = Some(window);
1397                }
1398                if let Some(interval) = monitoring_interval {
1399                    pending.monitoring_interval = Some(interval);
1400                }
1401            }
1402        }
1403        let instance_arn = instance.db_instance_arn.clone();
1404        let xml = query_response_xml(
1405            "ModifyDBInstance",
1406            RDS_NS,
1407            &format!(
1408                "<DBInstance>{}</DBInstance>",
1409                db_instance_xml(instance, Some("modifying"))
1410            ),
1411            &request.request_id,
1412        );
1413        drop(accounts);
1414
1415        self.emit_event(
1416            RdsSourceType::DbInstance,
1417            &db_instance_identifier,
1418            &instance_arn,
1419            "RDS-EVENT-0014",
1420            &["configuration change"],
1421            "DB instance was modified",
1422        );
1423
1424        Ok(AwsResponse::xml(StatusCode::OK, xml))
1425    }
1426
1427    async fn reboot_db_instance(
1428        &self,
1429        request: &AwsRequest,
1430    ) -> Result<AwsResponse, AwsServiceError> {
1431        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1432        let force_failover =
1433            parse_optional_bool(optional_query_param(request, "ForceFailover").as_deref())?;
1434        if force_failover == Some(true) {
1435            return Err(AwsServiceError::aws_error(
1436                StatusCode::BAD_REQUEST,
1437                "InvalidParameterCombination",
1438                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
1439            ));
1440        }
1441
1442        let instance = {
1443            let accounts = self.state.read();
1444            let empty = RdsState::new(&request.account_id, &request.region);
1445            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1446            state
1447                .instances
1448                .get(&db_instance_identifier)
1449                .cloned()
1450                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
1451        };
1452
1453        let runtime = self.require_runtime()?;
1454
1455        let running = runtime
1456            .restart_container(
1457                &db_instance_identifier,
1458                &instance.engine,
1459                &instance.master_username,
1460                &instance.master_user_password,
1461                instance
1462                    .db_name
1463                    .as_deref()
1464                    .unwrap_or(default_db_name(&instance.engine)),
1465            )
1466            .await
1467            .map_err(runtime_error_to_service_error)?;
1468
1469        let instance = {
1470            let mut accounts = self.state.write();
1471            let state = accounts.get_or_create(&request.account_id);
1472            let instance = state
1473                .instances
1474                .get_mut(&db_instance_identifier)
1475                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1476            instance.host_port = running.host_port;
1477            instance.port = i32::from(running.host_port);
1478
1479            // Apply any pending modifications
1480            if let Some(pending) = instance.pending_modified_values.take() {
1481                apply_pending_to_instance(instance, pending);
1482            }
1483
1484            instance.clone()
1485        };
1486
1487        self.emit_event(
1488            RdsSourceType::DbInstance,
1489            &db_instance_identifier,
1490            &instance.db_instance_arn,
1491            "RDS-EVENT-0006",
1492            &["availability"],
1493            "DB instance restarted",
1494        );
1495
1496        Ok(AwsResponse::xml(
1497            StatusCode::OK,
1498            query_response_xml(
1499                "RebootDBInstance",
1500                RDS_NS,
1501                &format!(
1502                    "<DBInstance>{}</DBInstance>",
1503                    db_instance_xml(&instance, Some("rebooting"))
1504                ),
1505                &request.request_id,
1506            ),
1507        ))
1508    }
1509
1510    fn describe_db_engine_versions(
1511        &self,
1512        request: &AwsRequest,
1513    ) -> Result<AwsResponse, AwsServiceError> {
1514        let engine = optional_query_param(request, "Engine");
1515        let engine_version = optional_query_param(request, "EngineVersion");
1516        let family = optional_query_param(request, "DBParameterGroupFamily");
1517        let default_only =
1518            parse_optional_bool(optional_query_param(request, "DefaultOnly").as_deref())?;
1519
1520        let mut versions = filter_engine_versions(
1521            &default_engine_versions(),
1522            &engine,
1523            &engine_version,
1524            &family,
1525        );
1526
1527        if default_only.unwrap_or(false) {
1528            versions.truncate(1);
1529        }
1530
1531        Ok(AwsResponse::xml(
1532            StatusCode::OK,
1533            query_response_xml(
1534                "DescribeDBEngineVersions",
1535                RDS_NS,
1536                &format!(
1537                    "<DBEngineVersions>{}</DBEngineVersions>",
1538                    versions.iter().map(engine_version_xml).collect::<String>()
1539                ),
1540                &request.request_id,
1541            ),
1542        ))
1543    }
1544
1545    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1546        let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1547        let marker = optional_query_param(request, "Marker");
1548        let max_records = optional_query_param(request, "MaxRecords");
1549
1550        let accounts = self.state.read();
1551        let empty = RdsState::new(&request.account_id, &request.region);
1552        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1553
1554        // If specific identifier requested, return just that one (no pagination)
1555        if let Some(identifier) = db_instance_identifier {
1556            let instance = state
1557                .instances
1558                .get(&identifier)
1559                .cloned()
1560                .ok_or_else(|| db_instance_not_found(&identifier))?;
1561
1562            return Ok(AwsResponse::xml(
1563                StatusCode::OK,
1564                query_response_xml(
1565                    "DescribeDBInstances",
1566                    RDS_NS,
1567                    &format!(
1568                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1569                        db_instance_xml(&instance, None)
1570                    ),
1571                    &request.request_id,
1572                ),
1573            ));
1574        }
1575
1576        // Get all instances sorted by created_at, then identifier
1577        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1578        instances.sort_by(|a, b| {
1579            a.created_at
1580                .cmp(&b.created_at)
1581                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1582        });
1583
1584        // Apply pagination
1585        let paginated = paginate(instances, marker, max_records, |inst| {
1586            &inst.db_instance_identifier
1587        })?;
1588
1589        let marker_xml = paginated
1590            .next_marker
1591            .as_ref()
1592            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1593            .unwrap_or_default();
1594
1595        Ok(AwsResponse::xml(
1596            StatusCode::OK,
1597            query_response_xml(
1598                "DescribeDBInstances",
1599                RDS_NS,
1600                &format!(
1601                    "<DBInstances>{}</DBInstances>{}",
1602                    paginated
1603                        .items
1604                        .iter()
1605                        .map(|instance| {
1606                            format!(
1607                                "<DBInstance>{}</DBInstance>",
1608                                db_instance_xml(instance, None)
1609                            )
1610                        })
1611                        .collect::<String>(),
1612                    marker_xml
1613                ),
1614                &request.request_id,
1615            ),
1616        ))
1617    }
1618
1619    fn describe_orderable_db_instance_options(
1620        &self,
1621        request: &AwsRequest,
1622    ) -> Result<AwsResponse, AwsServiceError> {
1623        let engine = optional_query_param(request, "Engine");
1624        let engine_version = optional_query_param(request, "EngineVersion");
1625        let db_instance_class = optional_query_param(request, "DBInstanceClass");
1626        let license_model = optional_query_param(request, "LicenseModel");
1627        let vpc = parse_optional_bool(optional_query_param(request, "Vpc").as_deref())?;
1628
1629        let options = filter_orderable_options(
1630            &default_orderable_options(),
1631            &engine,
1632            &engine_version,
1633            &db_instance_class,
1634            &license_model,
1635            vpc,
1636        );
1637
1638        Ok(AwsResponse::xml(
1639            StatusCode::OK,
1640            query_response_xml(
1641                "DescribeOrderableDBInstanceOptions",
1642                RDS_NS,
1643                &format!(
1644                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1645                    options.iter().map(orderable_option_xml).collect::<String>()
1646                ),
1647                &request.request_id,
1648            ),
1649        ))
1650    }
1651
1652    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1653        let resource_name = required_query_param(request, "ResourceName")?;
1654        let tags = parse_tags(request)?;
1655
1656        if tags.is_empty() {
1657            return Err(AwsServiceError::aws_error(
1658                StatusCode::BAD_REQUEST,
1659                "MissingParameter",
1660                "The request must contain the parameter Tags.",
1661            ));
1662        }
1663
1664        let mut accounts = self.state.write();
1665        let state = accounts.get_or_create(&request.account_id);
1666        let mut target = resolve_tag_target_mut(state, &resource_name)?;
1667        target.merge(&tags);
1668
1669        Ok(AwsResponse::xml(
1670            StatusCode::OK,
1671            query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
1672        ))
1673    }
1674
1675    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1676        let resource_name = required_query_param(request, "ResourceName")?;
1677        if query_param_prefix_exists(request, "Filters.") {
1678            return Err(AwsServiceError::aws_error(
1679                StatusCode::BAD_REQUEST,
1680                "InvalidParameterValue",
1681                "Filters are not yet supported for ListTagsForResource.",
1682            ));
1683        }
1684
1685        let accounts = self.state.read();
1686        let empty = RdsState::new(&request.account_id, &request.region);
1687        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1688        let target = resolve_tag_target(state, &resource_name)?;
1689        let tag_xml = target.to_xml();
1690
1691        Ok(AwsResponse::xml(
1692            StatusCode::OK,
1693            query_response_xml(
1694                "ListTagsForResource",
1695                RDS_NS,
1696                &format!("<TagList>{tag_xml}</TagList>"),
1697                &request.request_id,
1698            ),
1699        ))
1700    }
1701
1702    fn remove_tags_from_resource(
1703        &self,
1704        request: &AwsRequest,
1705    ) -> Result<AwsResponse, AwsServiceError> {
1706        let resource_name = required_query_param(request, "ResourceName")?;
1707        let tag_keys = parse_tag_keys(request)?;
1708
1709        if tag_keys.is_empty() {
1710            return Err(AwsServiceError::aws_error(
1711                StatusCode::BAD_REQUEST,
1712                "MissingParameter",
1713                "The request must contain the parameter TagKeys.",
1714            ));
1715        }
1716
1717        let mut accounts = self.state.write();
1718        let state = accounts.get_or_create(&request.account_id);
1719        let mut target = resolve_tag_target_mut(state, &resource_name)?;
1720        target.remove_keys(&tag_keys);
1721
1722        Ok(AwsResponse::xml(
1723            StatusCode::OK,
1724            query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
1725        ))
1726    }
1727
1728    async fn create_db_snapshot(
1729        &self,
1730        request: &AwsRequest,
1731    ) -> Result<AwsResponse, AwsServiceError> {
1732        let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1733        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1734
1735        let runtime = self.runtime.as_ref().ok_or_else(|| {
1736            AwsServiceError::aws_error(
1737                StatusCode::SERVICE_UNAVAILABLE,
1738                "InvalidParameterValue",
1739                "Docker/Podman is required for RDS snapshots but is not available",
1740            )
1741        })?;
1742
1743        let (instance, db_name) = {
1744            let accounts = self.state.read();
1745            let empty = RdsState::new(&request.account_id, &request.region);
1746            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1747
1748            if state.snapshots.contains_key(&db_snapshot_identifier) {
1749                return Err(AwsServiceError::aws_error(
1750                    StatusCode::CONFLICT,
1751                    "DBSnapshotAlreadyExists",
1752                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1753                ));
1754            }
1755
1756            let instance = state
1757                .instances
1758                .get(&db_instance_identifier)
1759                .cloned()
1760                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1761
1762            let default_db = default_db_name(&instance.engine);
1763            let db_name = instance
1764                .db_name
1765                .as_deref()
1766                .unwrap_or(default_db)
1767                .to_string();
1768
1769            (instance, db_name)
1770        };
1771
1772        let dump_data = runtime
1773            .dump_database(
1774                &db_instance_identifier,
1775                &instance.engine,
1776                &instance.master_username,
1777                &instance.master_user_password,
1778                &db_name,
1779            )
1780            .await
1781            .map_err(runtime_error_to_service_error)?;
1782
1783        let mut accounts = self.state.write();
1784        let state = accounts.get_or_create(&request.account_id);
1785
1786        if state.snapshots.contains_key(&db_snapshot_identifier) {
1787            return Err(AwsServiceError::aws_error(
1788                StatusCode::CONFLICT,
1789                "DBSnapshotAlreadyExists",
1790                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1791            ));
1792        }
1793
1794        let snapshot = DbSnapshot {
1795            db_snapshot_identifier: db_snapshot_identifier.clone(),
1796            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1797            db_instance_identifier: instance.db_instance_identifier.clone(),
1798            snapshot_create_time: Utc::now(),
1799            engine: instance.engine.clone(),
1800            engine_version: instance.engine_version.clone(),
1801            allocated_storage: instance.allocated_storage,
1802            status: "available".to_string(),
1803            port: instance.port,
1804            master_username: instance.master_username.clone(),
1805            db_name: instance.db_name.clone(),
1806            dbi_resource_id: instance.dbi_resource_id.clone(),
1807            snapshot_type: "manual".to_string(),
1808            master_user_password: instance.master_user_password.clone(),
1809            tags: Vec::new(),
1810            dump_data,
1811            availability_zone: instance.availability_zone.clone(),
1812            vpc_id: None,
1813            instance_create_time: Some(instance.created_at),
1814            license_model: Some(
1815                service_helpers::license_model_for_engine(&instance.engine).to_string(),
1816            ),
1817            iops: instance.iops,
1818            option_group_name: instance.option_group_name.clone(),
1819            percent_progress: Some(100),
1820            storage_type: instance.storage_type.clone(),
1821            encrypted: instance.storage_encrypted,
1822            kms_key_id: instance.kms_key_id.clone(),
1823            iam_database_authentication_enabled: instance.iam_database_authentication_enabled,
1824            timezone: None,
1825            storage_throughput: None,
1826        };
1827
1828        state
1829            .snapshots
1830            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1831        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1832        drop(accounts);
1833
1834        self.emit_event(
1835            RdsSourceType::DbSnapshot,
1836            &db_snapshot_identifier,
1837            &snapshot_arn,
1838            "RDS-EVENT-0042",
1839            &["creation"],
1840            "Manual snapshot created",
1841        );
1842
1843        Ok(AwsResponse::xml(
1844            StatusCode::OK,
1845            query_response_xml(
1846                "CreateDBSnapshot",
1847                RDS_NS,
1848                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1849                &request.request_id,
1850            ),
1851        ))
1852    }
1853
1854    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1855        let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier");
1856        let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1857        let marker = optional_query_param(request, "Marker");
1858        let max_records = optional_query_param(request, "MaxRecords");
1859
1860        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1861            return Err(AwsServiceError::aws_error(
1862                StatusCode::BAD_REQUEST,
1863                "InvalidParameterCombination",
1864                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1865            ));
1866        }
1867
1868        let accounts = self.state.read();
1869        let empty = RdsState::new(&request.account_id, &request.region);
1870        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1871
1872        // If specific snapshot requested, return just that one (no pagination)
1873        if let Some(snapshot_id) = db_snapshot_identifier {
1874            let snapshot = state
1875                .snapshots
1876                .get(&snapshot_id)
1877                .cloned()
1878                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1879
1880            return Ok(AwsResponse::xml(
1881                StatusCode::OK,
1882                query_response_xml(
1883                    "DescribeDBSnapshots",
1884                    RDS_NS,
1885                    &format!(
1886                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1887                        db_snapshot_xml(&snapshot)
1888                    ),
1889                    &request.request_id,
1890                ),
1891            ));
1892        }
1893
1894        // Get snapshots, filtered by instance identifier if provided
1895        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1896            state
1897                .snapshots
1898                .values()
1899                .filter(|s| s.db_instance_identifier == instance_id)
1900                .cloned()
1901                .collect()
1902        } else {
1903            state.snapshots.values().cloned().collect()
1904        };
1905
1906        // Sort by creation time, then identifier
1907        snapshots.sort_by(|a, b| {
1908            a.snapshot_create_time
1909                .cmp(&b.snapshot_create_time)
1910                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1911        });
1912
1913        // Apply pagination
1914        let paginated = paginate(snapshots, marker, max_records, |snap| {
1915            &snap.db_snapshot_identifier
1916        })?;
1917
1918        let marker_xml = paginated
1919            .next_marker
1920            .as_ref()
1921            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1922            .unwrap_or_default();
1923
1924        Ok(AwsResponse::xml(
1925            StatusCode::OK,
1926            query_response_xml(
1927                "DescribeDBSnapshots",
1928                RDS_NS,
1929                &format!(
1930                    "<DBSnapshots>{}</DBSnapshots>{}",
1931                    paginated
1932                        .items
1933                        .iter()
1934                        .map(|snapshot| format!(
1935                            "<DBSnapshot>{}</DBSnapshot>",
1936                            db_snapshot_xml(snapshot)
1937                        ))
1938                        .collect::<String>(),
1939                    marker_xml
1940                ),
1941                &request.request_id,
1942            ),
1943        ))
1944    }
1945
1946    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1947        let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1948
1949        let mut accounts = self.state.write();
1950        let state = accounts.get_or_create(&request.account_id);
1951
1952        let snapshot = state
1953            .snapshots
1954            .remove(&db_snapshot_identifier)
1955            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1956        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1957        drop(accounts);
1958
1959        self.emit_event(
1960            RdsSourceType::DbSnapshot,
1961            &db_snapshot_identifier,
1962            &snapshot_arn,
1963            "RDS-EVENT-0041",
1964            &["deletion"],
1965            "Manual snapshot deleted",
1966        );
1967
1968        Ok(AwsResponse::xml(
1969            StatusCode::OK,
1970            query_response_xml(
1971                "DeleteDBSnapshot",
1972                RDS_NS,
1973                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1974                &request.request_id,
1975            ),
1976        ))
1977    }
1978
1979    async fn restore_db_instance_from_db_snapshot(
1980        &self,
1981        request: &AwsRequest,
1982    ) -> Result<AwsResponse, AwsServiceError> {
1983        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1984        let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1985        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1986        let tags = parse_tags(request)?;
1987
1988        let runtime = self.require_runtime()?;
1989
1990        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1991            let mut accounts = self.state.write();
1992            let state = accounts.get_or_create(&request.account_id);
1993
1994            if !state.begin_instance_creation(&db_instance_identifier) {
1995                return Err(AwsServiceError::aws_error(
1996                    StatusCode::CONFLICT,
1997                    "DBInstanceAlreadyExists",
1998                    format!("DBInstance {db_instance_identifier} already exists."),
1999                ));
2000            }
2001
2002            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
2003                Some(s) => s,
2004                None => {
2005                    state.cancel_instance_creation(&db_instance_identifier);
2006                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
2007                }
2008            };
2009
2010            let dbi_resource_id = state.next_dbi_resource_id();
2011            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
2012            let created_at = Utc::now();
2013
2014            (snapshot, dbi_resource_id, db_instance_arn, created_at)
2015        };
2016
2017        let db_name = snapshot
2018            .db_name
2019            .as_deref()
2020            .unwrap_or(default_db_name(&snapshot.engine));
2021        let running = match runtime
2022            .ensure_postgres(
2023                &db_instance_identifier,
2024                &snapshot.engine,
2025                &snapshot.engine_version,
2026                &snapshot.master_username,
2027                &snapshot.master_user_password,
2028                db_name,
2029                &request.account_id,
2030                &request.region,
2031            )
2032            .await
2033        {
2034            Ok(running) => running,
2035            Err(e) => {
2036                self.state
2037                    .write()
2038                    .get_or_create(&request.account_id)
2039                    .cancel_instance_creation(&db_instance_identifier);
2040                return Err(runtime_error_to_service_error(e));
2041            }
2042        };
2043
2044        if let Err(e) = runtime
2045            .restore_database(
2046                &db_instance_identifier,
2047                &snapshot.engine,
2048                &snapshot.master_username,
2049                &snapshot.master_user_password,
2050                db_name,
2051                &snapshot.dump_data,
2052            )
2053            .await
2054        {
2055            self.state
2056                .write()
2057                .get_or_create(&request.account_id)
2058                .cancel_instance_creation(&db_instance_identifier);
2059            runtime.stop_container(&db_instance_identifier).await;
2060            return Err(runtime_error_to_service_error(e));
2061        }
2062
2063        let instance = build_restored_instance(
2064            &db_instance_identifier,
2065            db_instance_arn,
2066            dbi_resource_id,
2067            created_at,
2068            vpc_security_group_ids,
2069            &snapshot,
2070            &running,
2071            tags,
2072        );
2073
2074        self.state
2075            .write()
2076            .get_or_create(&request.account_id)
2077            .finish_instance_creation(instance.clone());
2078
2079        self.emit_event(
2080            RdsSourceType::DbInstance,
2081            &db_instance_identifier,
2082            &instance.db_instance_arn,
2083            "RDS-EVENT-0043",
2084            &["creation"],
2085            "DB instance restored from snapshot",
2086        );
2087
2088        Ok(AwsResponse::xml(
2089            StatusCode::OK,
2090            query_response_xml(
2091                "RestoreDBInstanceFromDBSnapshot",
2092                RDS_NS,
2093                &format!(
2094                    "<DBInstance>{}</DBInstance>",
2095                    db_instance_xml(&instance, None)
2096                ),
2097                &request.request_id,
2098            ),
2099        ))
2100    }
2101
2102    async fn create_db_instance_read_replica(
2103        &self,
2104        request: &AwsRequest,
2105    ) -> Result<AwsResponse, AwsServiceError> {
2106        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2107        let source_db_instance_identifier =
2108            required_query_param(request, "SourceDBInstanceIdentifier")?;
2109
2110        let runtime = self.runtime.as_ref().ok_or_else(|| {
2111            AwsServiceError::aws_error(
2112                StatusCode::SERVICE_UNAVAILABLE,
2113                "InvalidParameterValue",
2114                "Docker/Podman is required for RDS read replicas but is not available",
2115            )
2116        })?;
2117
2118        let (source_instance, db_name) = {
2119            let mut accounts = self.state.write();
2120            let state = accounts.get_or_create(&request.account_id);
2121
2122            if !state.begin_instance_creation(&db_instance_identifier) {
2123                return Err(AwsServiceError::aws_error(
2124                    StatusCode::CONFLICT,
2125                    "DBInstanceAlreadyExists",
2126                    format!("DBInstance {db_instance_identifier} already exists."),
2127                ));
2128            }
2129
2130            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
2131            {
2132                Some(inst) => inst,
2133                None => {
2134                    state.cancel_instance_creation(&db_instance_identifier);
2135                    return Err(db_instance_not_found(&source_db_instance_identifier));
2136                }
2137            };
2138
2139            let default_db = default_db_name(&source_instance.engine);
2140            let db_name = source_instance
2141                .db_name
2142                .as_deref()
2143                .unwrap_or(default_db)
2144                .to_string();
2145
2146            (source_instance, db_name)
2147        };
2148
2149        let dump_data = match runtime
2150            .dump_database(
2151                &source_db_instance_identifier,
2152                &source_instance.engine,
2153                &source_instance.master_username,
2154                &source_instance.master_user_password,
2155                &db_name,
2156            )
2157            .await
2158        {
2159            Ok(data) => data,
2160            Err(e) => {
2161                self.state
2162                    .write()
2163                    .get_or_create(&request.account_id)
2164                    .cancel_instance_creation(&db_instance_identifier);
2165                return Err(runtime_error_to_service_error(e));
2166            }
2167        };
2168
2169        let (dbi_resource_id, db_instance_arn) = {
2170            let accounts = self.state.read();
2171            let empty = RdsState::new(&request.account_id, &request.region);
2172            let s = accounts.get(&request.account_id).unwrap_or(&empty);
2173            (
2174                s.next_dbi_resource_id(),
2175                s.db_instance_arn(&db_instance_identifier),
2176            )
2177        };
2178        let created_at = Utc::now();
2179
2180        let running = match runtime
2181            .ensure_postgres(
2182                &db_instance_identifier,
2183                &source_instance.engine,
2184                &source_instance.engine_version,
2185                &source_instance.master_username,
2186                &source_instance.master_user_password,
2187                &db_name,
2188                &request.account_id,
2189                &request.region,
2190            )
2191            .await
2192        {
2193            Ok(running) => running,
2194            Err(e) => {
2195                self.state
2196                    .write()
2197                    .get_or_create(&request.account_id)
2198                    .cancel_instance_creation(&db_instance_identifier);
2199                return Err(runtime_error_to_service_error(e));
2200            }
2201        };
2202
2203        if let Err(e) = runtime
2204            .restore_database(
2205                &db_instance_identifier,
2206                &source_instance.engine,
2207                &source_instance.master_username,
2208                &source_instance.master_user_password,
2209                &db_name,
2210                &dump_data,
2211            )
2212            .await
2213        {
2214            self.state
2215                .write()
2216                .get_or_create(&request.account_id)
2217                .cancel_instance_creation(&db_instance_identifier);
2218            runtime.stop_container(&db_instance_identifier).await;
2219            return Err(runtime_error_to_service_error(e));
2220        }
2221
2222        let replica = build_read_replica_instance(
2223            &db_instance_identifier,
2224            db_instance_arn,
2225            dbi_resource_id,
2226            created_at,
2227            &source_db_instance_identifier,
2228            &source_instance,
2229            &running,
2230        );
2231
2232        let source_missing = {
2233            let mut accounts = self.state.write();
2234            let state = accounts.get_or_create(&request.account_id);
2235            match state.instances.get_mut(&source_db_instance_identifier) {
2236                Some(source) => {
2237                    source
2238                        .read_replica_db_instance_identifiers
2239                        .push(db_instance_identifier.clone());
2240                    state.finish_instance_creation(replica.clone());
2241                    false
2242                }
2243                None => {
2244                    state.cancel_instance_creation(&db_instance_identifier);
2245                    true
2246                }
2247            }
2248        };
2249
2250        if source_missing {
2251            runtime.stop_container(&db_instance_identifier).await;
2252            return Err(db_instance_not_found(&source_db_instance_identifier));
2253        }
2254
2255        self.emit_event(
2256            RdsSourceType::DbInstance,
2257            &db_instance_identifier,
2258            &replica.db_instance_arn,
2259            "RDS-EVENT-0005",
2260            &["creation", "read replica"],
2261            "Read replica DB instance created",
2262        );
2263
2264        Ok(AwsResponse::xml(
2265            StatusCode::OK,
2266            query_response_xml(
2267                "CreateDBInstanceReadReplica",
2268                RDS_NS,
2269                &format!(
2270                    "<DBInstance>{}</DBInstance>",
2271                    db_instance_xml(&replica, None)
2272                ),
2273                &request.request_id,
2274            ),
2275        ))
2276    }
2277
2278    async fn restore_db_instance_to_point_in_time(
2279        &self,
2280        request: &AwsRequest,
2281    ) -> Result<AwsResponse, AwsServiceError> {
2282        let target_id = required_query_param(request, "TargetDBInstanceIdentifier")?;
2283        let source_id = required_query_param(request, "SourceDBInstanceIdentifier")?;
2284        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2285        let tags = parse_tags(request)?;
2286
2287        let (source_instance, db_name) = {
2288            let mut accounts = self.state.write();
2289            let state = accounts.get_or_create(&request.account_id);
2290
2291            if !state.begin_instance_creation(&target_id) {
2292                return Err(AwsServiceError::aws_error(
2293                    StatusCode::CONFLICT,
2294                    "DBInstanceAlreadyExists",
2295                    format!("DBInstance {target_id} already exists."),
2296                ));
2297            }
2298
2299            let source_instance = match state.instances.get(&source_id).cloned() {
2300                Some(inst) => inst,
2301                None => {
2302                    state.cancel_instance_creation(&target_id);
2303                    return Err(db_instance_not_found(&source_id));
2304                }
2305            };
2306
2307            let default_db = default_db_name(&source_instance.engine);
2308            let db_name = source_instance
2309                .db_name
2310                .as_deref()
2311                .unwrap_or(default_db)
2312                .to_string();
2313
2314            (source_instance, db_name)
2315        };
2316
2317        let runtime = match self.require_runtime() {
2318            Ok(rt) => rt,
2319            Err(e) => {
2320                self.state
2321                    .write()
2322                    .get_or_create(&request.account_id)
2323                    .cancel_instance_creation(&target_id);
2324                return Err(e);
2325            }
2326        };
2327
2328        let dump_data = match runtime
2329            .dump_database(
2330                &source_id,
2331                &source_instance.engine,
2332                &source_instance.master_username,
2333                &source_instance.master_user_password,
2334                &db_name,
2335            )
2336            .await
2337        {
2338            Ok(data) => data,
2339            Err(e) => {
2340                self.state
2341                    .write()
2342                    .get_or_create(&request.account_id)
2343                    .cancel_instance_creation(&target_id);
2344                return Err(runtime_error_to_service_error(e));
2345            }
2346        };
2347
2348        let (dbi_resource_id, db_instance_arn) = {
2349            let accounts = self.state.read();
2350            let empty = RdsState::new(&request.account_id, &request.region);
2351            let s = accounts.get(&request.account_id).unwrap_or(&empty);
2352            (s.next_dbi_resource_id(), s.db_instance_arn(&target_id))
2353        };
2354        let created_at = Utc::now();
2355
2356        let running = match runtime
2357            .ensure_postgres(
2358                &target_id,
2359                &source_instance.engine,
2360                &source_instance.engine_version,
2361                &source_instance.master_username,
2362                &source_instance.master_user_password,
2363                &db_name,
2364                &request.account_id,
2365                &request.region,
2366            )
2367            .await
2368        {
2369            Ok(running) => running,
2370            Err(e) => {
2371                self.state
2372                    .write()
2373                    .get_or_create(&request.account_id)
2374                    .cancel_instance_creation(&target_id);
2375                return Err(runtime_error_to_service_error(e));
2376            }
2377        };
2378
2379        if let Err(e) = runtime
2380            .restore_database(
2381                &target_id,
2382                &source_instance.engine,
2383                &source_instance.master_username,
2384                &source_instance.master_user_password,
2385                &db_name,
2386                &dump_data,
2387            )
2388            .await
2389        {
2390            self.state
2391                .write()
2392                .get_or_create(&request.account_id)
2393                .cancel_instance_creation(&target_id);
2394            runtime.stop_container(&target_id).await;
2395            return Err(runtime_error_to_service_error(e));
2396        }
2397
2398        let restore_to_time = required_query_param(request, "RestoreTime")
2399            .ok()
2400            .or_else(|| required_query_param(request, "RestoreToTime").ok());
2401        let use_latest = required_query_param(request, "UseLatestRestorableTime")
2402            .ok()
2403            .map(|s| s.eq_ignore_ascii_case("true"))
2404            .unwrap_or(false);
2405
2406        let mut instance = build_pit_restored_instance(
2407            &target_id,
2408            db_instance_arn,
2409            dbi_resource_id,
2410            created_at,
2411            vpc_security_group_ids,
2412            &source_instance,
2413            &running,
2414            tags,
2415        );
2416
2417        if let Some(t) = restore_to_time.as_ref() {
2418            if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(t) {
2419                instance.latest_restorable_time = Some(parsed.with_timezone(&Utc));
2420            }
2421        } else if use_latest {
2422            instance.latest_restorable_time = source_instance.latest_restorable_time;
2423        }
2424
2425        self.state
2426            .write()
2427            .get_or_create(&request.account_id)
2428            .finish_instance_creation(instance.clone());
2429
2430        self.emit_event(
2431            RdsSourceType::DbInstance,
2432            &target_id,
2433            &instance.db_instance_arn,
2434            "RDS-EVENT-0008",
2435            &["creation"],
2436            "DB instance restored to point in time",
2437        );
2438
2439        Ok(AwsResponse::xml(
2440            StatusCode::OK,
2441            query_response_xml(
2442                "RestoreDBInstanceToPointInTime",
2443                RDS_NS,
2444                &format!(
2445                    "<DBInstance>{}</DBInstance>",
2446                    db_instance_xml(&instance, None)
2447                ),
2448                &request.request_id,
2449            ),
2450        ))
2451    }
2452
2453    async fn restore_db_instance_from_s3(
2454        &self,
2455        request: &AwsRequest,
2456    ) -> Result<AwsResponse, AwsServiceError> {
2457        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2458        let s3_bucket = required_query_param(request, "S3BucketName")?;
2459        let s3_prefix = optional_query_param(request, "S3Prefix").unwrap_or_default();
2460        let master_username = required_query_param(request, "MasterUsername")?;
2461        let master_user_password = required_query_param(request, "MasterUserPassword")?;
2462        let engine = required_query_param(request, "Engine")?;
2463        let engine_version = optional_query_param(request, "EngineVersion")
2464            .or_else(|| optional_query_param(request, "SourceEngineVersion"))
2465            .unwrap_or_else(|| match engine.as_str() {
2466                "postgres" => "16.3".to_string(),
2467                "mysql" => "8.0".to_string(),
2468                "mariadb" => "10.6".to_string(),
2469                _ => "0".to_string(),
2470            });
2471        let allocated_storage = optional_query_param(request, "AllocatedStorage")
2472            .and_then(|s| s.parse::<i32>().ok())
2473            .unwrap_or(20);
2474        let db_instance_class = optional_query_param(request, "DBInstanceClass")
2475            .unwrap_or_else(|| "db.t3.micro".to_string());
2476        let db_name_opt = optional_query_param(request, "DBName");
2477        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2478        let tags = parse_tags(request)?;
2479
2480        let bus = self.delivery_bus.as_ref().ok_or_else(|| {
2481            AwsServiceError::aws_error(
2482                StatusCode::SERVICE_UNAVAILABLE,
2483                "InvalidParameterValue",
2484                "S3 client not wired into RDS service",
2485            )
2486        })?;
2487
2488        let dump_data = bus
2489            .get_object_from_s3(&request.account_id, &s3_bucket, &s3_prefix)
2490            .map_err(|e| {
2491                AwsServiceError::aws_error(
2492                    StatusCode::BAD_REQUEST,
2493                    "InvalidS3BucketFault",
2494                    format!("S3 backup at {s3_bucket}/{s3_prefix} unavailable: {e}"),
2495                )
2496            })?;
2497
2498        let runtime = self.require_runtime()?;
2499
2500        let (dbi_resource_id, db_instance_arn) = {
2501            let mut accounts = self.state.write();
2502            let state = accounts.get_or_create(&request.account_id);
2503
2504            if !state.begin_instance_creation(&db_instance_identifier) {
2505                return Err(AwsServiceError::aws_error(
2506                    StatusCode::CONFLICT,
2507                    "DBInstanceAlreadyExists",
2508                    format!("DBInstance {db_instance_identifier} already exists."),
2509                ));
2510            }
2511
2512            (
2513                state.next_dbi_resource_id(),
2514                state.db_instance_arn(&db_instance_identifier),
2515            )
2516        };
2517
2518        let db_name = db_name_opt.unwrap_or_else(|| default_db_name(&engine).to_string());
2519        let created_at = Utc::now();
2520
2521        let running = match runtime
2522            .ensure_postgres(
2523                &db_instance_identifier,
2524                &engine,
2525                &engine_version,
2526                &master_username,
2527                &master_user_password,
2528                &db_name,
2529                &request.account_id,
2530                &request.region,
2531            )
2532            .await
2533        {
2534            Ok(running) => running,
2535            Err(e) => {
2536                self.state
2537                    .write()
2538                    .get_or_create(&request.account_id)
2539                    .cancel_instance_creation(&db_instance_identifier);
2540                return Err(runtime_error_to_service_error(e));
2541            }
2542        };
2543
2544        if let Err(e) = runtime
2545            .restore_database(
2546                &db_instance_identifier,
2547                &engine,
2548                &master_username,
2549                &master_user_password,
2550                &db_name,
2551                &dump_data,
2552            )
2553            .await
2554        {
2555            self.state
2556                .write()
2557                .get_or_create(&request.account_id)
2558                .cancel_instance_creation(&db_instance_identifier);
2559            runtime.stop_container(&db_instance_identifier).await;
2560            return Err(runtime_error_to_service_error(e));
2561        }
2562
2563        let instance = build_s3_restored_instance(
2564            &db_instance_identifier,
2565            db_instance_arn,
2566            dbi_resource_id,
2567            created_at,
2568            allocated_storage,
2569            db_instance_class,
2570            engine.clone(),
2571            engine_version,
2572            master_username,
2573            master_user_password,
2574            db_name,
2575            vpc_security_group_ids,
2576            &running,
2577            tags,
2578        );
2579
2580        self.state
2581            .write()
2582            .get_or_create(&request.account_id)
2583            .finish_instance_creation(instance.clone());
2584
2585        self.emit_event(
2586            RdsSourceType::DbInstance,
2587            &db_instance_identifier,
2588            &instance.db_instance_arn,
2589            "RDS-EVENT-0043",
2590            &["creation"],
2591            "DB instance restored from S3 backup",
2592        );
2593
2594        Ok(AwsResponse::xml(
2595            StatusCode::OK,
2596            query_response_xml(
2597                "RestoreDBInstanceFromS3",
2598                RDS_NS,
2599                &format!(
2600                    "<DBInstance>{}</DBInstance>",
2601                    db_instance_xml(&instance, None)
2602                ),
2603                &request.request_id,
2604            ),
2605        ))
2606    }
2607
2608    /// Real CreateDBClusterSnapshot: locates the cluster's writer
2609    /// member, dumps its database synchronously via the runtime, and
2610    /// stores the dump alongside the snapshot's metadata so a later
2611    /// RestoreDBClusterFromSnapshot can replay the exact state.
2612    async fn create_db_cluster_snapshot(
2613        &self,
2614        request: &AwsRequest,
2615    ) -> Result<AwsResponse, AwsServiceError> {
2616        use serde_json::json;
2617        let snapshot_id = required_query_param(request, "DBClusterSnapshotIdentifier")?;
2618        let cluster_id = required_query_param(request, "DBClusterIdentifier")?;
2619        let arn = format!(
2620            "arn:aws:rds:{}:{}:cluster-snapshot:{}",
2621            request.region, request.account_id, snapshot_id
2622        );
2623
2624        let writer_info = {
2625            let accounts = self.state.read();
2626            accounts.get(&request.account_id).and_then(|state| {
2627                let cluster_entry = state.extras.get("clusters")?.get(&cluster_id)?;
2628                let writer_id = cluster_entry
2629                    .get("WriterDBInstanceIdentifier")
2630                    .and_then(|v| v.as_str())
2631                    .map(str::to_string)
2632                    .or_else(|| {
2633                        cluster_entry
2634                            .get("DBClusterMembers")
2635                            .and_then(|m| m.as_array())
2636                            .and_then(|arr| {
2637                                arr.iter()
2638                                    .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
2639                                    .or_else(|| arr.first())
2640                                    .and_then(|m| m["DBInstanceIdentifier"].as_str())
2641                                    .map(str::to_string)
2642                            })
2643                    })?;
2644                let inst = state.instances.get(&writer_id)?;
2645                Some((
2646                    inst.db_instance_identifier.clone(),
2647                    inst.engine.clone(),
2648                    inst.master_username.clone(),
2649                    inst.master_user_password.clone(),
2650                    inst.db_name
2651                        .clone()
2652                        .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
2653                ))
2654            })
2655        };
2656
2657        let dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
2658            if let Some(runtime) = self.runtime_ref() {
2659                match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
2660                    Ok(data) => {
2661                        use base64::Engine;
2662                        Some(base64::engine::general_purpose::STANDARD.encode(&data))
2663                    }
2664                    Err(e) => {
2665                        tracing::warn!(
2666                            error = %e,
2667                            cluster = %cluster_id,
2668                            writer = %wid,
2669                            "cluster snapshot dump failed; falling back to metadata-only snapshot"
2670                        );
2671                        None
2672                    }
2673                }
2674            } else {
2675                None
2676            }
2677        } else {
2678            None
2679        };
2680
2681        {
2682            let mut accounts = self.state.write();
2683            let state = accounts.get_or_create(&request.account_id);
2684            let mut entry = state
2685                .extras
2686                .get("clusters")
2687                .and_then(|m| m.get(&cluster_id))
2688                .cloned()
2689                .unwrap_or_else(|| json!({}));
2690            if let Some(obj) = entry.as_object_mut() {
2691                obj.insert(
2692                    "DBClusterSnapshotIdentifier".to_string(),
2693                    json!(snapshot_id),
2694                );
2695                obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
2696                obj.insert("DBClusterIdentifier".to_string(), json!(cluster_id));
2697                obj.insert("Status".to_string(), json!("available"));
2698                obj.insert("SnapshotType".to_string(), json!("manual"));
2699                if let Some(b64) = dump_b64.as_ref() {
2700                    obj.insert("DumpDataB64".to_string(), json!(b64));
2701                }
2702            }
2703            state
2704                .extras
2705                .entry("cluster_snapshots".to_string())
2706                .or_default()
2707                .insert(snapshot_id.clone(), entry);
2708        }
2709
2710        self.emit_event(
2711            RdsSourceType::DbClusterSnapshot,
2712            &snapshot_id,
2713            &arn,
2714            "RDS-EVENT-0074",
2715            &["backup"],
2716            "DB cluster snapshot created",
2717        );
2718
2719        Ok(AwsResponse::xml(
2720            StatusCode::OK,
2721            query_response_xml(
2722                "CreateDBClusterSnapshot",
2723                RDS_NS,
2724                &crate::extras::cluster_snapshot_xml(&snapshot_id, &arn, &cluster_id),
2725                &request.request_id,
2726            ),
2727        ))
2728    }
2729
2730    /// Real RestoreDBClusterFromSnapshot: clones the source cluster's
2731    /// metadata into the new cluster id, strips member tracking so the
2732    /// restored cluster starts empty, and stages the snapshot's dump
2733    /// data on the new cluster so the next CreateDBInstance with this
2734    /// cluster id replays it onto the fresh writer.
2735    async fn restore_db_cluster_from_snapshot(
2736        &self,
2737        request: &AwsRequest,
2738    ) -> Result<AwsResponse, AwsServiceError> {
2739        use serde_json::json;
2740        let target = required_query_param(request, "DBClusterIdentifier")?;
2741        let snapshot_id = optional_query_param(request, "SnapshotIdentifier")
2742            .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
2743            .ok_or_else(|| {
2744                AwsServiceError::aws_error(
2745                    StatusCode::BAD_REQUEST,
2746                    "MissingParameter",
2747                    "SnapshotIdentifier is required",
2748                )
2749            })?;
2750        let arn = format!(
2751            "arn:aws:rds:{}:{}:cluster:{}",
2752            request.region, request.account_id, target
2753        );
2754
2755        let mut accounts = self.state.write();
2756        let state = accounts.get_or_create(&request.account_id);
2757        let snapshot = state
2758            .extras
2759            .get("cluster_snapshots")
2760            .and_then(|m| m.get(&snapshot_id))
2761            .cloned()
2762            .ok_or_else(|| {
2763                AwsServiceError::aws_error(
2764                    StatusCode::NOT_FOUND,
2765                    "DBClusterSnapshotNotFoundFault",
2766                    format!("DBClusterSnapshot {snapshot_id} not found."),
2767                )
2768            })?;
2769        let source_cluster_id = snapshot
2770            .get("DBClusterIdentifier")
2771            .and_then(|v| v.as_str())
2772            .unwrap_or("")
2773            .to_string();
2774        let pending_dump_b64 = snapshot
2775            .get("DumpDataB64")
2776            .and_then(|v| v.as_str())
2777            .map(str::to_string);
2778
2779        let mut entry = state
2780            .extras
2781            .get("clusters")
2782            .and_then(|m| m.get(&source_cluster_id))
2783            .cloned()
2784            .unwrap_or_else(|| {
2785                json!({
2786                    "Engine": optional_query_param(request, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
2787                    "EngineVersion": optional_query_param(request, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
2788                    "MasterUsername": "postgres",
2789                    "Port": 5432,
2790                })
2791            });
2792        if let Some(obj) = entry.as_object_mut() {
2793            obj.insert("DBClusterIdentifier".to_string(), json!(target));
2794            obj.insert("DBClusterArn".to_string(), json!(arn));
2795            obj.insert("Status".to_string(), json!("available"));
2796            obj.insert(
2797                "Endpoint".to_string(),
2798                json!(format!(
2799                    "{target}.cluster-xxx.{}.rds.amazonaws.com",
2800                    request.region
2801                )),
2802            );
2803            obj.insert(
2804                "ReaderEndpoint".to_string(),
2805                json!(format!(
2806                    "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
2807                    request.region
2808                )),
2809            );
2810            obj.remove("ReplicationSourceIdentifier");
2811            obj.remove("DBClusterMembers");
2812            obj.remove("WriterDBInstanceIdentifier");
2813            obj.remove("DBClusterSnapshotIdentifier");
2814            obj.remove("DBClusterSnapshotArn");
2815            obj.remove("DumpDataB64");
2816            if let Some(engine) = optional_query_param(request, "Engine") {
2817                obj.insert("Engine".to_string(), json!(engine));
2818            }
2819            if let Some(version) = optional_query_param(request, "EngineVersion") {
2820                obj.insert("EngineVersion".to_string(), json!(version));
2821            }
2822            if let Some(port) =
2823                optional_query_param(request, "Port").and_then(|p| p.parse::<i64>().ok())
2824            {
2825                obj.insert("Port".to_string(), json!(port));
2826            }
2827            if let Some(b64) = pending_dump_b64 {
2828                obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
2829            }
2830        }
2831        state
2832            .extras
2833            .entry("clusters".to_string())
2834            .or_default()
2835            .insert(target.clone(), entry);
2836        drop(accounts);
2837
2838        self.emit_event(
2839            RdsSourceType::DbCluster,
2840            &target,
2841            &arn,
2842            "RDS-EVENT-0170",
2843            &["creation"],
2844            "DB cluster restored from snapshot",
2845        );
2846
2847        Ok(AwsResponse::xml(
2848            StatusCode::OK,
2849            query_response_xml(
2850                "RestoreDBClusterFromSnapshot",
2851                RDS_NS,
2852                &crate::extras::db_cluster_xml(&target, &arn),
2853                &request.request_id,
2854            ),
2855        ))
2856    }
2857
2858    /// Real RestoreDBClusterToPointInTime: dumps the source cluster's
2859    /// writer live, clones the source cluster's metadata to the new
2860    /// id, and stages the dump on the new cluster so the first
2861    /// CreateDBInstance attached to it replays the data.
2862    async fn restore_db_cluster_to_point_in_time(
2863        &self,
2864        request: &AwsRequest,
2865    ) -> Result<AwsResponse, AwsServiceError> {
2866        use serde_json::json;
2867        let target = required_query_param(request, "DBClusterIdentifier")?;
2868        let source = required_query_param(request, "SourceDBClusterIdentifier")?;
2869        let arn = format!(
2870            "arn:aws:rds:{}:{}:cluster:{}",
2871            request.region, request.account_id, target
2872        );
2873
2874        let writer_info = {
2875            let accounts = self.state.read();
2876            accounts.get(&request.account_id).and_then(|state| {
2877                let cluster_entry = state.extras.get("clusters")?.get(&source)?;
2878                let writer_id = cluster_entry
2879                    .get("WriterDBInstanceIdentifier")
2880                    .and_then(|v| v.as_str())
2881                    .map(str::to_string)
2882                    .or_else(|| {
2883                        cluster_entry
2884                            .get("DBClusterMembers")
2885                            .and_then(|m| m.as_array())
2886                            .and_then(|arr| {
2887                                arr.iter()
2888                                    .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
2889                                    .or_else(|| arr.first())
2890                                    .and_then(|m| m["DBInstanceIdentifier"].as_str())
2891                                    .map(str::to_string)
2892                            })
2893                    })?;
2894                let inst = state.instances.get(&writer_id)?;
2895                Some((
2896                    inst.db_instance_identifier.clone(),
2897                    inst.engine.clone(),
2898                    inst.master_username.clone(),
2899                    inst.master_user_password.clone(),
2900                    inst.db_name
2901                        .clone()
2902                        .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
2903                ))
2904            })
2905        };
2906
2907        let pending_dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
2908            if let Some(runtime) = self.runtime_ref() {
2909                match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
2910                    Ok(data) => {
2911                        use base64::Engine;
2912                        Some(base64::engine::general_purpose::STANDARD.encode(&data))
2913                    }
2914                    Err(e) => {
2915                        tracing::warn!(
2916                            error = %e,
2917                            cluster = %source,
2918                            writer = %wid,
2919                            "cluster PIT dump failed; falling back to metadata-only restore"
2920                        );
2921                        None
2922                    }
2923                }
2924            } else {
2925                None
2926            }
2927        } else {
2928            None
2929        };
2930
2931        let mut accounts = self.state.write();
2932        let state = accounts.get_or_create(&request.account_id);
2933        let mut entry = state
2934            .extras
2935            .get("clusters")
2936            .and_then(|m| m.get(&source))
2937            .cloned()
2938            .ok_or_else(|| {
2939                AwsServiceError::aws_error(
2940                    StatusCode::NOT_FOUND,
2941                    "DBClusterNotFoundFault",
2942                    format!("DBCluster {source} not found."),
2943                )
2944            })?;
2945        if let Some(obj) = entry.as_object_mut() {
2946            obj.insert("DBClusterIdentifier".to_string(), json!(target));
2947            obj.insert("DBClusterArn".to_string(), json!(arn));
2948            obj.insert("Status".to_string(), json!("available"));
2949            obj.insert(
2950                "Endpoint".to_string(),
2951                json!(format!(
2952                    "{target}.cluster-xxx.{}.rds.amazonaws.com",
2953                    request.region
2954                )),
2955            );
2956            obj.insert(
2957                "ReaderEndpoint".to_string(),
2958                json!(format!(
2959                    "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
2960                    request.region
2961                )),
2962            );
2963            obj.remove("DBClusterMembers");
2964            obj.remove("WriterDBInstanceIdentifier");
2965            if let Some(restore_time) = optional_query_param(request, "RestoreToTime") {
2966                obj.insert("RestoreToTime".to_string(), json!(restore_time));
2967            }
2968            if let Some(latest) = optional_query_param(request, "UseLatestRestorableTime") {
2969                obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
2970            }
2971            if let Some(b64) = pending_dump_b64 {
2972                obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
2973            }
2974        }
2975        state
2976            .extras
2977            .entry("clusters".to_string())
2978            .or_default()
2979            .insert(target.clone(), entry);
2980        drop(accounts);
2981
2982        self.emit_event(
2983            RdsSourceType::DbCluster,
2984            &target,
2985            &arn,
2986            "RDS-EVENT-0171",
2987            &["creation"],
2988            "DB cluster restored to point in time",
2989        );
2990
2991        Ok(AwsResponse::xml(
2992            StatusCode::OK,
2993            query_response_xml(
2994                "RestoreDBClusterToPointInTime",
2995                RDS_NS,
2996                &crate::extras::db_cluster_xml(&target, &arn),
2997                &request.request_id,
2998            ),
2999        ))
3000    }
3001
3002    async fn describe_db_log_files(
3003        &self,
3004        request: &AwsRequest,
3005    ) -> Result<AwsResponse, AwsServiceError> {
3006        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3007        let filename_contains = optional_query_param(request, "FilenameContains");
3008        let file_last_written =
3009            optional_query_param(request, "FileLastWritten").and_then(|s| s.parse::<i64>().ok());
3010        let file_size =
3011            optional_query_param(request, "FileSize").and_then(|s| s.parse::<i64>().ok());
3012
3013        let engine = {
3014            let accounts = self.state.read();
3015            let state = accounts
3016                .get(&request.account_id)
3017                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3018            let instance = state
3019                .instances
3020                .get(&db_instance_identifier)
3021                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3022            instance.engine.clone()
3023        };
3024
3025        // Synthetic log catalogue. Real RDS exposes per-engine log
3026        // files in `error/` and `trace/` (and `audit/` for some
3027        // engines) — we publish two well-known names so SDK callers
3028        // get a non-empty listing even when the runtime can't reach
3029        // the live container yet.
3030        let now_millis = Utc::now().timestamp_millis();
3031        let candidates: Vec<(String, i64, i64)> = match engine.as_str() {
3032            "mysql" | "mariadb" => vec![
3033                ("error/mysql-error.log".to_string(), now_millis, 1024),
3034                ("slowquery/mysql-slowquery.log".to_string(), now_millis, 512),
3035            ],
3036            _ => vec![
3037                ("error/postgres.log".to_string(), now_millis, 1024),
3038                ("trace/postgres-trace.log".to_string(), now_millis, 512),
3039            ],
3040        };
3041
3042        let filtered: Vec<(String, i64, i64)> = candidates
3043            .into_iter()
3044            .filter(|(name, written, size)| {
3045                if let Some(needle) = &filename_contains {
3046                    if !name.contains(needle) {
3047                        return false;
3048                    }
3049                }
3050                if let Some(min_written) = file_last_written {
3051                    // FileLastWritten is documented in epoch seconds; our
3052                    // synthetic timestamps are in millis, so compare
3053                    // against the seconds form.
3054                    if *written / 1000 <= min_written {
3055                        return false;
3056                    }
3057                }
3058                if let Some(min_size) = file_size {
3059                    if *size < min_size {
3060                        return false;
3061                    }
3062                }
3063                true
3064            })
3065            .collect();
3066
3067        let details: String = filtered
3068            .iter()
3069            .map(|(name, written, size)| {
3070                format!(
3071                    "<DescribeDBLogFilesDetails>\
3072                     <LogFileName>{}</LogFileName>\
3073                     <LastWritten>{}</LastWritten>\
3074                     <Size>{}</Size>\
3075                     </DescribeDBLogFilesDetails>",
3076                    xml_escape(name),
3077                    written,
3078                    size,
3079                )
3080            })
3081            .collect();
3082
3083        Ok(AwsResponse::xml(
3084            StatusCode::OK,
3085            query_response_xml(
3086                "DescribeDBLogFiles",
3087                RDS_NS,
3088                &format!("<DescribeDBLogFiles>{details}</DescribeDBLogFiles>"),
3089                &request.request_id,
3090            ),
3091        ))
3092    }
3093
3094    async fn download_db_log_file_portion(
3095        &self,
3096        request: &AwsRequest,
3097    ) -> Result<AwsResponse, AwsServiceError> {
3098        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3099        let log_file_name = required_query_param(request, "LogFileName")?;
3100        let _marker = optional_query_param(request, "Marker").unwrap_or_else(|| "0".to_string());
3101        let _number_of_lines = optional_query_param(request, "NumberOfLines")
3102            .and_then(|s| s.parse::<i64>().ok())
3103            .unwrap_or(0);
3104
3105        let engine = {
3106            let accounts = self.state.read();
3107            let state = accounts
3108                .get(&request.account_id)
3109                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3110            let instance = state
3111                .instances
3112                .get(&db_instance_identifier)
3113                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3114            instance.engine.clone()
3115        };
3116
3117        let known_synthetic = matches!(
3118            (engine.as_str(), log_file_name.as_str()),
3119            ("mysql" | "mariadb", "error/mysql-error.log")
3120                | ("mysql" | "mariadb", "slowquery/mysql-slowquery.log")
3121                | (_, "error/postgres.log")
3122                | (_, "trace/postgres-trace.log")
3123        );
3124
3125        let container_path = map_log_file_to_container_path(&engine, &log_file_name);
3126
3127        let log_data = if let Some(runtime) = self.runtime.as_ref() {
3128            match runtime
3129                .read_log_file(&db_instance_identifier, &container_path)
3130                .await
3131            {
3132                Ok(bytes) => Some(bytes),
3133                Err(RuntimeError::Unavailable) => None,
3134                Err(RuntimeError::ContainerStartFailed(_)) if known_synthetic => Some(Vec::new()),
3135                Err(RuntimeError::ContainerStartFailed(message)) => {
3136                    return Err(AwsServiceError::aws_error(
3137                        StatusCode::NOT_FOUND,
3138                        "DBLogFileNotFoundFault",
3139                        format!("DBLogFile {log_file_name} not found: {message}"),
3140                    ));
3141                }
3142            }
3143        } else if known_synthetic {
3144            Some(Vec::new())
3145        } else {
3146            None
3147        };
3148
3149        let log_data = match log_data {
3150            Some(bytes) => bytes,
3151            None => {
3152                return Err(AwsServiceError::aws_error(
3153                    StatusCode::NOT_FOUND,
3154                    "DBLogFileNotFoundFault",
3155                    format!("DBLogFile {log_file_name} not found"),
3156                ))
3157            }
3158        };
3159
3160        let payload = String::from_utf8_lossy(&log_data).into_owned();
3161        let total_bytes = payload.len();
3162
3163        Ok(AwsResponse::xml(
3164            StatusCode::OK,
3165            query_response_xml(
3166                "DownloadDBLogFilePortion",
3167                RDS_NS,
3168                &format!(
3169                    "<LogFileData>{}</LogFileData>\
3170                     <Marker>{}</Marker>\
3171                     <AdditionalDataPending>false</AdditionalDataPending>",
3172                    xml_escape(&payload),
3173                    total_bytes,
3174                ),
3175                &request.request_id,
3176            ),
3177        ))
3178    }
3179
3180    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3181        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3182        let db_subnet_group_description =
3183            required_query_param(request, "DBSubnetGroupDescription")?;
3184        let subnet_ids = parse_subnet_ids(request)?;
3185
3186        if subnet_ids.is_empty() {
3187            return Err(AwsServiceError::aws_error(
3188                StatusCode::BAD_REQUEST,
3189                "InvalidParameterValue",
3190                "At least one subnet must be specified.",
3191            ));
3192        }
3193
3194        if subnet_ids.len() < 2 {
3195            return Err(AwsServiceError::aws_error(
3196                StatusCode::BAD_REQUEST,
3197                "DBSubnetGroupDoesNotCoverEnoughAZs",
3198                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3199            ));
3200        }
3201
3202        let mut accounts = self.state.write();
3203        let state = accounts.get_or_create(&request.account_id);
3204
3205        if state.subnet_groups.contains_key(&db_subnet_group_name) {
3206            return Err(AwsServiceError::aws_error(
3207                StatusCode::CONFLICT,
3208                "DBSubnetGroupAlreadyExists",
3209                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
3210            ));
3211        }
3212
3213        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
3214        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3215            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
3216            .collect();
3217
3218        // Validate that subnets span at least 2 unique Availability Zones
3219        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3220        if unique_azs.len() < 2 {
3221            return Err(AwsServiceError::aws_error(
3222                StatusCode::BAD_REQUEST,
3223                "DBSubnetGroupDoesNotCoverEnoughAZs",
3224                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3225            ));
3226        }
3227
3228        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
3229        let tags = parse_tags(request)?;
3230
3231        let subnet_group = DbSubnetGroup {
3232            db_subnet_group_name: db_subnet_group_name.clone(),
3233            db_subnet_group_arn,
3234            db_subnet_group_description,
3235            vpc_id,
3236            subnet_ids,
3237            subnet_availability_zones,
3238            tags,
3239        };
3240
3241        state
3242            .subnet_groups
3243            .insert(db_subnet_group_name, subnet_group.clone());
3244
3245        Ok(AwsResponse::xml(
3246            StatusCode::OK,
3247            query_response_xml(
3248                "CreateDBSubnetGroup",
3249                RDS_NS,
3250                &format!(
3251                    "<DBSubnetGroup>{}</DBSubnetGroup>",
3252                    db_subnet_group_xml(&subnet_group)
3253                ),
3254                &request.request_id,
3255            ),
3256        ))
3257    }
3258
3259    fn describe_db_subnet_groups(
3260        &self,
3261        request: &AwsRequest,
3262    ) -> Result<AwsResponse, AwsServiceError> {
3263        let db_subnet_group_name = optional_query_param(request, "DBSubnetGroupName");
3264        let marker = optional_query_param(request, "Marker");
3265        let max_records = optional_query_param(request, "MaxRecords");
3266
3267        let accounts = self.state.read();
3268        let empty = RdsState::new(&request.account_id, &request.region);
3269        let state = accounts.get(&request.account_id).unwrap_or(&empty);
3270
3271        // If specific subnet group requested, return just that one (no pagination)
3272        if let Some(name) = db_subnet_group_name {
3273            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
3274                AwsServiceError::aws_error(
3275                    StatusCode::NOT_FOUND,
3276                    "DBSubnetGroupNotFoundFault",
3277                    format!("DBSubnetGroup {} not found.", name),
3278                )
3279            })?;
3280
3281            return Ok(AwsResponse::xml(
3282                StatusCode::OK,
3283                query_response_xml(
3284                    "DescribeDBSubnetGroups",
3285                    RDS_NS,
3286                    &format!(
3287                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
3288                        db_subnet_group_xml(sg)
3289                    ),
3290                    &request.request_id,
3291                ),
3292            ));
3293        }
3294
3295        // Get all subnet groups sorted by name
3296        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
3297        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
3298
3299        // Apply pagination
3300        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
3301            &sg.db_subnet_group_name
3302        })?;
3303
3304        let marker_xml = paginated
3305            .next_marker
3306            .as_ref()
3307            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3308            .unwrap_or_default();
3309
3310        let body = paginated
3311            .items
3312            .iter()
3313            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
3314            .collect::<Vec<_>>()
3315            .join("");
3316
3317        Ok(AwsResponse::xml(
3318            StatusCode::OK,
3319            query_response_xml(
3320                "DescribeDBSubnetGroups",
3321                RDS_NS,
3322                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
3323                &request.request_id,
3324            ),
3325        ))
3326    }
3327
3328    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3329        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3330
3331        let mut accounts = self.state.write();
3332        let state = accounts.get_or_create(&request.account_id);
3333
3334        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
3335            return Err(AwsServiceError::aws_error(
3336                StatusCode::NOT_FOUND,
3337                "DBSubnetGroupNotFoundFault",
3338                format!("DBSubnetGroup {db_subnet_group_name} not found."),
3339            ));
3340        }
3341
3342        Ok(AwsResponse::xml(
3343            StatusCode::OK,
3344            query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
3345        ))
3346    }
3347
3348    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3349        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3350        let subnet_ids = parse_subnet_ids(request)?;
3351
3352        if subnet_ids.is_empty() {
3353            return Err(AwsServiceError::aws_error(
3354                StatusCode::BAD_REQUEST,
3355                "InvalidParameterValue",
3356                "At least one subnet must be specified.",
3357            ));
3358        }
3359
3360        if subnet_ids.len() < 2 {
3361            return Err(AwsServiceError::aws_error(
3362                StatusCode::BAD_REQUEST,
3363                "DBSubnetGroupDoesNotCoverEnoughAZs",
3364                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3365            ));
3366        }
3367
3368        let mut accounts = self.state.write();
3369        let state = accounts.get_or_create(&request.account_id);
3370
3371        let region = state.region.clone();
3372
3373        let subnet_group = state
3374            .subnet_groups
3375            .get_mut(&db_subnet_group_name)
3376            .ok_or_else(|| {
3377                AwsServiceError::aws_error(
3378                    StatusCode::NOT_FOUND,
3379                    "DBSubnetGroupNotFoundFault",
3380                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
3381                )
3382            })?;
3383
3384        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3385            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
3386            .collect();
3387
3388        // Validate that subnets span at least 2 unique Availability Zones
3389        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3390        if unique_azs.len() < 2 {
3391            return Err(AwsServiceError::aws_error(
3392                StatusCode::BAD_REQUEST,
3393                "DBSubnetGroupDoesNotCoverEnoughAZs",
3394                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3395            ));
3396        }
3397
3398        subnet_group.subnet_ids = subnet_ids;
3399        subnet_group.subnet_availability_zones = subnet_availability_zones;
3400
3401        let subnet_group_clone = subnet_group.clone();
3402
3403        Ok(AwsResponse::xml(
3404            StatusCode::OK,
3405            query_response_xml(
3406                "ModifyDBSubnetGroup",
3407                RDS_NS,
3408                &format!(
3409                    "<DBSubnetGroup>{}</DBSubnetGroup>",
3410                    db_subnet_group_xml(&subnet_group_clone)
3411                ),
3412                &request.request_id,
3413            ),
3414        ))
3415    }
3416
3417    fn create_db_parameter_group(
3418        &self,
3419        request: &AwsRequest,
3420    ) -> Result<AwsResponse, AwsServiceError> {
3421        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3422        let db_parameter_group_family = required_query_param(request, "DBParameterGroupFamily")?;
3423        let description = required_query_param(request, "Description")?;
3424
3425        // Validate parameter group family against supported engines and versions
3426        let valid_families = [
3427            "postgres16",
3428            "postgres15",
3429            "postgres14",
3430            "postgres13",
3431            "mysql8.0",
3432            "mysql5.7",
3433            "mariadb10.11",
3434            "mariadb10.6",
3435        ];
3436
3437        if !valid_families.contains(&db_parameter_group_family.as_str()) {
3438            return Err(AwsServiceError::aws_error(
3439                StatusCode::BAD_REQUEST,
3440                "InvalidParameterValue",
3441                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
3442            ));
3443        }
3444
3445        let mut accounts = self.state.write();
3446        let state = accounts.get_or_create(&request.account_id);
3447
3448        if state
3449            .parameter_groups
3450            .contains_key(&db_parameter_group_name)
3451        {
3452            return Err(AwsServiceError::aws_error(
3453                StatusCode::CONFLICT,
3454                "DBParameterGroupAlreadyExists",
3455                format!("DBParameterGroup {db_parameter_group_name} already exists."),
3456            ));
3457        }
3458
3459        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
3460        let tags = parse_tags(request)?;
3461
3462        let parameter_group = DbParameterGroup {
3463            db_parameter_group_name: db_parameter_group_name.clone(),
3464            db_parameter_group_arn,
3465            db_parameter_group_family,
3466            description,
3467            parameters: std::collections::BTreeMap::new(),
3468            tags,
3469        };
3470
3471        state
3472            .parameter_groups
3473            .insert(db_parameter_group_name.clone(), parameter_group.clone());
3474        let arn = parameter_group.db_parameter_group_arn.clone();
3475        drop(accounts);
3476
3477        self.emit_event(
3478            RdsSourceType::DbParameterGroup,
3479            &db_parameter_group_name,
3480            &arn,
3481            "RDS-EVENT-0179",
3482            &["creation"],
3483            "DB parameter group created",
3484        );
3485
3486        Ok(AwsResponse::xml(
3487            StatusCode::OK,
3488            query_response_xml(
3489                "CreateDBParameterGroup",
3490                RDS_NS,
3491                &format!(
3492                    "<DBParameterGroup>{}</DBParameterGroup>",
3493                    db_parameter_group_xml(&parameter_group)
3494                ),
3495                &request.request_id,
3496            ),
3497        ))
3498    }
3499
3500    fn describe_db_parameter_groups(
3501        &self,
3502        request: &AwsRequest,
3503    ) -> Result<AwsResponse, AwsServiceError> {
3504        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
3505        let marker = optional_query_param(request, "Marker");
3506        let max_records = optional_query_param(request, "MaxRecords");
3507
3508        let accounts = self.state.read();
3509        let empty = RdsState::new(&request.account_id, &request.region);
3510        let state = accounts.get(&request.account_id).unwrap_or(&empty);
3511
3512        // If specific parameter group requested, return just that one (no pagination)
3513        if let Some(name) = db_parameter_group_name {
3514            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
3515                AwsServiceError::aws_error(
3516                    StatusCode::NOT_FOUND,
3517                    "DBParameterGroupNotFound",
3518                    format!("DBParameterGroup {} not found.", name),
3519                )
3520            })?;
3521
3522            return Ok(AwsResponse::xml(
3523                StatusCode::OK,
3524                query_response_xml(
3525                    "DescribeDBParameterGroups", RDS_NS,
3526                    &format!(
3527                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
3528                        db_parameter_group_xml(pg)
3529                    ),
3530                    &request.request_id,
3531                ),
3532            ));
3533        }
3534
3535        // Get all parameter groups sorted by name
3536        let mut parameter_groups: Vec<DbParameterGroup> =
3537            state.parameter_groups.values().cloned().collect();
3538        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
3539
3540        // Apply pagination
3541        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
3542            &pg.db_parameter_group_name
3543        })?;
3544
3545        let marker_xml = paginated
3546            .next_marker
3547            .as_ref()
3548            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3549            .unwrap_or_default();
3550
3551        let body = paginated
3552            .items
3553            .iter()
3554            .map(|pg| {
3555                format!(
3556                    "<DBParameterGroup>{}</DBParameterGroup>",
3557                    db_parameter_group_xml(pg)
3558                )
3559            })
3560            .collect::<Vec<_>>()
3561            .join("");
3562
3563        Ok(AwsResponse::xml(
3564            StatusCode::OK,
3565            query_response_xml(
3566                "DescribeDBParameterGroups",
3567                RDS_NS,
3568                &format!(
3569                    "<DBParameterGroups>{}</DBParameterGroups>{}",
3570                    body, marker_xml
3571                ),
3572                &request.request_id,
3573            ),
3574        ))
3575    }
3576
3577    fn delete_db_parameter_group(
3578        &self,
3579        request: &AwsRequest,
3580    ) -> Result<AwsResponse, AwsServiceError> {
3581        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3582
3583        let arn = {
3584            let mut accounts = self.state.write();
3585            let state = accounts.get_or_create(&request.account_id);
3586
3587            if db_parameter_group_name.starts_with("default.") {
3588                return Err(AwsServiceError::aws_error(
3589                    StatusCode::BAD_REQUEST,
3590                    "InvalidParameterValue",
3591                    "Cannot delete default parameter groups.",
3592                ));
3593            }
3594
3595            let removed = state
3596                .parameter_groups
3597                .remove(&db_parameter_group_name)
3598                .ok_or_else(|| {
3599                    AwsServiceError::aws_error(
3600                        StatusCode::NOT_FOUND,
3601                        "DBParameterGroupNotFound",
3602                        format!("DBParameterGroup {db_parameter_group_name} not found."),
3603                    )
3604                })?;
3605            removed.db_parameter_group_arn
3606        };
3607
3608        self.emit_event(
3609            RdsSourceType::DbParameterGroup,
3610            &db_parameter_group_name,
3611            &arn,
3612            "RDS-EVENT-0064",
3613            &["deletion"],
3614            "DB parameter group deleted",
3615        );
3616
3617        Ok(AwsResponse::xml(
3618            StatusCode::OK,
3619            query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
3620        ))
3621    }
3622
3623    fn modify_db_parameter_group(
3624        &self,
3625        request: &AwsRequest,
3626    ) -> Result<AwsResponse, AwsServiceError> {
3627        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3628
3629        // Parse Parameters.member.N.{ParameterName,ParameterValue,ApplyMethod}
3630        // before taking the lock so we can validate input independently.
3631        // ApplyMethod is accepted (immediate vs pending-reboot) but the
3632        // single-state model applies all changes immediately.
3633        let parsed_params = parse_db_parameter_members(request);
3634
3635        let mut accounts = self.state.write();
3636        let state = accounts.get_or_create(&request.account_id);
3637
3638        let parameter_group = state
3639            .parameter_groups
3640            .get_mut(&db_parameter_group_name)
3641            .ok_or_else(|| {
3642                AwsServiceError::aws_error(
3643                    StatusCode::NOT_FOUND,
3644                    "DBParameterGroupNotFound",
3645                    format!("DBParameterGroup {db_parameter_group_name} not found."),
3646                )
3647            })?;
3648
3649        if let Some(new_description) = optional_query_param(request, "Description") {
3650            parameter_group.description = new_description;
3651        }
3652
3653        for (name, value) in parsed_params {
3654            parameter_group.parameters.insert(name, value);
3655        }
3656
3657        let parameter_group_clone = parameter_group.clone();
3658        let arn = parameter_group_clone.db_parameter_group_arn.clone();
3659        drop(accounts);
3660
3661        self.emit_event(
3662            RdsSourceType::DbParameterGroup,
3663            &db_parameter_group_name,
3664            &arn,
3665            "RDS-EVENT-0037",
3666            &["configuration change"],
3667            "DB parameter group modified",
3668        );
3669
3670        Ok(AwsResponse::xml(
3671            StatusCode::OK,
3672            query_response_xml(
3673                "ModifyDBParameterGroup",
3674                RDS_NS,
3675                &format!(
3676                    "<DBParameterGroupName>{}</DBParameterGroupName>",
3677                    xml_escape(&parameter_group_clone.db_parameter_group_name)
3678                ),
3679                &request.request_id,
3680            ),
3681        ))
3682    }
3683
3684    fn describe_db_parameters_real(
3685        &self,
3686        request: &AwsRequest,
3687    ) -> Result<AwsResponse, AwsServiceError> {
3688        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3689        let source_filter = optional_query_param(request, "Source");
3690
3691        let accounts = self.state.read();
3692        let state = match accounts.get(&request.account_id) {
3693            Some(s) => s,
3694            None => {
3695                return Err(AwsServiceError::aws_error(
3696                    StatusCode::NOT_FOUND,
3697                    "DBParameterGroupNotFound",
3698                    format!("DBParameterGroup {db_parameter_group_name} not found."),
3699                ));
3700            }
3701        };
3702        let parameter_group = state
3703            .parameter_groups
3704            .get(&db_parameter_group_name)
3705            .ok_or_else(|| {
3706                AwsServiceError::aws_error(
3707                    StatusCode::NOT_FOUND,
3708                    "DBParameterGroupNotFound",
3709                    format!("DBParameterGroup {db_parameter_group_name} not found."),
3710                )
3711            })?;
3712
3713        // Real RDS surfaces two parameter sources for a group:
3714        //   * `user` — values set via `ModifyDBParameterGroup`.
3715        //   * `engine-default` — baseline values inherited from the
3716        //     parameter group family (e.g. `postgres16`).
3717        // With no `Source` filter we return both, mirroring AWS. When a
3718        // user value shadows an engine default we skip the default so
3719        // each parameter appears exactly once.
3720        let source = source_filter.as_deref();
3721        let include_user = source.is_none_or(|s| s == "user");
3722        let include_engine_default = source.is_none_or(|s| s == "engine-default");
3723        let mut members_xml = String::new();
3724        if include_user {
3725            for (name, value) in &parameter_group.parameters {
3726                members_xml.push_str(&render_user_parameter_xml(name, value));
3727            }
3728        }
3729        if include_engine_default {
3730            // A user override flips a parameter's effective source from
3731            // `engine-default` to `user`, so engine-default views always
3732            // skip parameters the user has modified — even when the
3733            // caller asks only for `engine-default`.
3734            for default in
3735                crate::state::engine_default_parameters(&parameter_group.db_parameter_group_family)
3736            {
3737                if parameter_group.parameters.contains_key(default.name) {
3738                    continue;
3739                }
3740                members_xml.push_str(&render_engine_default_parameter_xml(default));
3741            }
3742        }
3743        let body = format!("    <Parameters>\n{members_xml}    </Parameters>");
3744        Ok(AwsResponse::xml(
3745            StatusCode::OK,
3746            query_response_xml("DescribeDBParameters", RDS_NS, &body, &request.request_id),
3747        ))
3748    }
3749}
3750
3751/// Render a single user-set parameter as the XML shape AWS emits inside
3752/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
3753/// alongside user values so we report `dynamic`/`string` defaults.
3754pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
3755    format!(
3756        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
3757        xml_escape(name),
3758        xml_escape(value),
3759    )
3760}
3761
3762/// Render a single engine-default parameter as the XML shape AWS emits
3763/// inside `DescribeDB(Cluster)Parameters` and
3764/// `DescribeEngineDefault(Cluster)Parameters` responses.
3765pub(crate) fn render_engine_default_parameter_xml(
3766    default: &crate::state::EngineDefaultParameter,
3767) -> String {
3768    format!(
3769        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
3770        xml_escape(default.name),
3771        xml_escape(default.value),
3772        xml_escape(default.apply_type),
3773        xml_escape(default.data_type),
3774        xml_escape(default.allowed_values),
3775        default.is_modifiable,
3776    )
3777}
3778
3779/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
3780/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
3781/// (the `Parameter` list location name from the Smithy model); we also
3782/// accept the generic `Parameters.member.N` form so hand-built clients
3783/// using the default Query list shape keep working. Skips members
3784/// missing a name or value; ApplyMethod is accepted but ignored
3785/// (single-state model).
3786pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
3787    let mut out = Vec::new();
3788    for prefix in ["Parameters.Parameter", "Parameters.member"] {
3789        let mut index = 1;
3790        loop {
3791            let name_key = format!("{prefix}.{index}.ParameterName");
3792            let value_key = format!("{prefix}.{index}.ParameterValue");
3793            let name = optional_query_param(request, &name_key);
3794            let value = optional_query_param(request, &value_key);
3795            if name.is_none() && value.is_none() {
3796                break;
3797            }
3798            if let (Some(n), Some(v)) = (name, value) {
3799                if !n.is_empty() {
3800                    out.push((n, v));
3801                }
3802            }
3803            index += 1;
3804        }
3805    }
3806    out
3807}
3808
3809/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
3810/// the absolute path inside the running container. Unknown names fall
3811/// through as-is so callers can also fetch arbitrary paths.
3812fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
3813    match (engine, log_file_name) {
3814        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
3815        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
3816        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
3817        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
3818            "/var/log/mysql/slow.log".to_string()
3819        }
3820        _ => log_file_name.to_string(),
3821    }
3822}
3823
3824pub(crate) struct PaginationResult<T> {
3825    items: Vec<T>,
3826    next_marker: Option<String>,
3827}
3828
3829/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
3830/// promoting it to writer when the cluster has none. Idempotent:
3831/// re-attaching an existing member is a no-op.
3832fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
3833    use serde_json::{json, Value};
3834    let Some(map) = state.extras.get_mut("clusters") else {
3835        return;
3836    };
3837    let Some(entry) = map.get_mut(cluster_id) else {
3838        return;
3839    };
3840    let Some(obj) = entry.as_object_mut() else {
3841        return;
3842    };
3843    let mut members: Vec<Value> = obj
3844        .get("DBClusterMembers")
3845        .and_then(|v| v.as_array())
3846        .cloned()
3847        .unwrap_or_default();
3848    if members
3849        .iter()
3850        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
3851    {
3852        return;
3853    }
3854    let has_writer = members
3855        .iter()
3856        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
3857    let promotion_tier = (members.len() as i64) + 1;
3858    members.push(json!({
3859        "DBInstanceIdentifier": instance_id,
3860        "IsClusterWriter": !has_writer,
3861        "DBClusterParameterGroupStatus": "in-sync",
3862        "PromotionTier": promotion_tier,
3863    }));
3864    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
3865    if !has_writer {
3866        obj.insert(
3867            "WriterDBInstanceIdentifier".to_string(),
3868            Value::String(instance_id.to_string()),
3869        );
3870    }
3871}
3872
3873#[path = "service_helpers.rs"]
3874mod service_helpers;
3875pub(crate) use service_helpers::*;
3876
3877#[cfg(test)]
3878#[path = "service_tests.rs"]
3879mod tests;