Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeSourceRegions",
125    "DescribeTenantDatabases",
126    "DescribeValidDBInstanceModifications",
127    "DisableHttpEndpoint",
128    "DownloadDBLogFilePortion",
129    "EnableHttpEndpoint",
130    "FailoverDBCluster",
131    "FailoverGlobalCluster",
132    "ListTagsForResource",
133    "ModifyActivityStream",
134    "ModifyCertificates",
135    "ModifyCurrentDBClusterCapacity",
136    "ModifyCustomDBEngineVersion",
137    "ModifyDBCluster",
138    "ModifyDBClusterEndpoint",
139    "ModifyDBClusterParameterGroup",
140    "ModifyDBClusterSnapshotAttribute",
141    "ModifyDBInstance",
142    "ModifyDBParameterGroup",
143    "ModifyDBProxy",
144    "ModifyDBProxyEndpoint",
145    "ModifyDBProxyTargetGroup",
146    "ModifyDBRecommendation",
147    "ModifyDBShardGroup",
148    "ModifyDBSnapshot",
149    "ModifyDBSnapshotAttribute",
150    "ModifyDBSubnetGroup",
151    "ModifyEventSubscription",
152    "ModifyGlobalCluster",
153    "ModifyIntegration",
154    "ModifyOptionGroup",
155    "ModifyTenantDatabase",
156    "PromoteReadReplica",
157    "PromoteReadReplicaDBCluster",
158    "PurchaseReservedDBInstancesOffering",
159    "RebootDBCluster",
160    "RebootDBInstance",
161    "RebootDBShardGroup",
162    "RegisterDBProxyTargets",
163    "RemoveFromGlobalCluster",
164    "RemoveRoleFromDBCluster",
165    "RemoveRoleFromDBInstance",
166    "RemoveSourceIdentifierFromSubscription",
167    "RemoveTagsFromResource",
168    "ResetDBClusterParameterGroup",
169    "ResetDBParameterGroup",
170    "RestoreDBClusterFromS3",
171    "RestoreDBClusterFromSnapshot",
172    "RestoreDBClusterToPointInTime",
173    "RestoreDBInstanceFromDBSnapshot",
174    "RestoreDBInstanceFromS3",
175    "RestoreDBInstanceToPointInTime",
176    "RevokeDBSecurityGroupIngress",
177    "StartActivityStream",
178    "StartDBCluster",
179    "StartDBInstance",
180    "StartDBInstanceAutomatedBackupsReplication",
181    "StartExportTask",
182    "StopActivityStream",
183    "StopDBCluster",
184    "StopDBInstance",
185    "StopDBInstanceAutomatedBackupsReplication",
186    "SwitchoverBlueGreenDeployment",
187    "SwitchoverGlobalCluster",
188    "SwitchoverReadReplica",
189];
190
191pub struct RdsService {
192    pub(crate) state: SharedRdsState,
193    runtime: Option<Arc<RdsRuntime>>,
194    snapshot_store: Option<Arc<dyn SnapshotStore>>,
195    snapshot_lock: Arc<AsyncMutex<()>>,
196    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
197}
198
199/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
200#[derive(Clone, Copy)]
201#[allow(dead_code, clippy::enum_variant_names)]
202pub(crate) enum RdsSourceType {
203    DbInstance,
204    DbSnapshot,
205    DbParameterGroup,
206    DbCluster,
207    DbClusterSnapshot,
208}
209
210impl RdsSourceType {
211    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
212    /// form AWS publishes in the `aws.rds` event detail.
213    fn as_str(self) -> &'static str {
214        match self {
215            Self::DbInstance => "DB_INSTANCE",
216            Self::DbSnapshot => "DB_SNAPSHOT",
217            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
218            Self::DbCluster => "DB_CLUSTER",
219            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
220        }
221    }
222
223    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
224    /// API spec this is the kebab-case form (`db-instance`,
225    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
226    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
227    pub(crate) fn describe_events_str(self) -> &'static str {
228        match self {
229            Self::DbInstance => "db-instance",
230            Self::DbSnapshot => "db-snapshot",
231            Self::DbParameterGroup => "db-parameter-group",
232            Self::DbCluster => "db-cluster",
233            Self::DbClusterSnapshot => "db-cluster-snapshot",
234        }
235    }
236
237    fn detail_type(self) -> &'static str {
238        match self {
239            Self::DbInstance => "RDS DB Instance Event",
240            Self::DbSnapshot => "RDS DB Snapshot Event",
241            Self::DbParameterGroup => "RDS DB Parameter Group Event",
242            Self::DbCluster => "RDS DB Cluster Event",
243            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
244        }
245    }
246}
247
248impl RdsService {
249    pub(crate) fn state_handle(&self) -> &SharedRdsState {
250        &self.state
251    }
252}
253
254impl RdsService {
255    pub fn new(state: SharedRdsState) -> Self {
256        Self {
257            state,
258            runtime: None,
259            snapshot_store: None,
260            snapshot_lock: Arc::new(AsyncMutex::new(())),
261            delivery_bus: None,
262        }
263    }
264
265    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
266        self.runtime = Some(runtime);
267        self
268    }
269
270    /// Crate-internal accessor for the optional runtime; needed by the
271    /// extras handler so cluster snapshot/restore paths can dump and
272    /// replay member databases via the live container runtime.
273    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
274        self.runtime.as_ref()
275    }
276
277    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
278        self.snapshot_store = Some(store);
279        self
280    }
281
282    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
283        self.delivery_bus = Some(bus);
284        self
285    }
286
287    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
288    /// Also records into the per-account events ring so DescribeEvents
289    /// can serve the row. No-op for the EventBridge side when the bus
290    /// isn't wired (tests, minimal configs).
291    pub(crate) fn emit_event(
292        &self,
293        source_type: RdsSourceType,
294        source_identifier: &str,
295        source_arn: &str,
296        event_id: &str,
297        event_categories: &[&str],
298        message: &str,
299    ) {
300        // Source the account_id off the source_arn (segment 4) — that's
301        // the canonical ARN form for RDS resources.
302        let account_id = source_arn.split(':').nth(4).unwrap_or("");
303        emit_event_static_with_state(
304            self.delivery_bus.as_ref(),
305            Some(&self.state),
306            if account_id.is_empty() {
307                None
308            } else {
309                Some(account_id)
310            },
311            source_type,
312            source_identifier,
313            source_arn,
314            event_id,
315            event_categories,
316            message,
317        );
318    }
319
320    async fn save_snapshot(&self) {
321        save_snapshot_static(
322            self.state.clone(),
323            self.snapshot_store.clone(),
324            self.snapshot_lock.clone(),
325        )
326        .await;
327    }
328
329    /// Recreate the backing Docker/Podman containers for persisted DB
330    /// instances after a fakecloud restart. Without this, persistent
331    /// mode loads the row back into memory with `db_instance_status =
332    /// available` but the container is gone, so the endpoint is dead
333    /// (issue #1338). Each candidate is flipped to `starting`
334    /// synchronously, then a background task brings the container back
335    /// and flips it back to `available`. Instances persisted as
336    /// `stopped` are skipped — `StartDBInstance` revives those.
337    pub async fn recover_persisted_containers(&self) {
338        let Some(runtime) = self.runtime.clone() else {
339            return;
340        };
341
342        struct Pending {
343            account_id: String,
344            region: String,
345            id: String,
346            arn: String,
347            engine: String,
348            engine_version: String,
349            username: String,
350            password: String,
351            db_name: String,
352        }
353
354        let pending: Vec<Pending> = {
355            let mut accounts = self.state.write();
356            let mut out = Vec::new();
357            for (_, state) in accounts.iter_mut() {
358                let account_id = state.account_id.clone();
359                let region = state.region.clone();
360                for (id, inst) in state.instances.iter_mut() {
361                    if !matches!(
362                        inst.db_instance_status.as_str(),
363                        "available" | "starting" | "modifying" | "rebooting" | "backing-up"
364                    ) {
365                        continue;
366                    }
367                    inst.db_instance_status = "starting".to_string();
368                    out.push(Pending {
369                        account_id: account_id.clone(),
370                        region: region.clone(),
371                        id: id.clone(),
372                        arn: inst.db_instance_arn.clone(),
373                        engine: inst.engine.clone(),
374                        engine_version: inst.engine_version.clone(),
375                        username: inst.master_username.clone(),
376                        password: inst.master_user_password.clone(),
377                        db_name: inst
378                            .db_name
379                            .clone()
380                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
381                    });
382                }
383            }
384            out
385        };
386
387        if pending.is_empty() {
388            return;
389        }
390        tracing::info!(
391            count = pending.len(),
392            "recovering backing containers for persisted rds instances",
393        );
394
395        for p in pending {
396            let runtime = runtime.clone();
397            let state = self.state.clone();
398            let snapshot_store = self.snapshot_store.clone();
399            let snapshot_lock = self.snapshot_lock.clone();
400            let delivery_bus = self.delivery_bus.clone();
401            tokio::spawn(async move {
402                match runtime
403                    .ensure_postgres(
404                        &p.id,
405                        &p.engine,
406                        &p.engine_version,
407                        &p.username,
408                        &p.password,
409                        &p.db_name,
410                        &p.account_id,
411                        &p.region,
412                    )
413                    .await
414                {
415                    Ok(running) => {
416                        {
417                            let mut accounts = state.write();
418                            if let Some(s) = accounts.get_mut(&p.account_id) {
419                                if let Some(inst) = s.instances.get_mut(&p.id) {
420                                    inst.db_instance_status = "available".to_string();
421                                    inst.endpoint_address = "127.0.0.1".to_string();
422                                    inst.port = i32::from(running.host_port);
423                                    inst.host_port = running.host_port;
424                                    inst.container_id = running.container_id;
425                                }
426                            }
427                        }
428                        save_snapshot_static(
429                            state.clone(),
430                            snapshot_store.clone(),
431                            snapshot_lock.clone(),
432                        )
433                        .await;
434                        emit_event_static(
435                            delivery_bus.as_ref(),
436                            RdsSourceType::DbInstance,
437                            &p.id,
438                            &p.arn,
439                            "RDS-EVENT-0088",
440                            &["notification"],
441                            "DB instance restarted after fakecloud restart",
442                        );
443                    }
444                    Err(error) => {
445                        tracing::error!(
446                            %error,
447                            db_instance_identifier = %p.id,
448                            "failed to recover rds backing container after restart",
449                        );
450                        {
451                            let mut accounts = state.write();
452                            if let Some(s) = accounts.get_mut(&p.account_id) {
453                                if let Some(inst) = s.instances.get_mut(&p.id) {
454                                    inst.db_instance_status = "failed".to_string();
455                                }
456                            }
457                        }
458                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
459                    }
460                }
461            });
462        }
463    }
464
465    /// Stop the backing container for `db_instance_identifier` and mark
466    /// the row `stopped`. Synchronous wrt the runtime so callers see a
467    /// real `stopped` status by the time the response goes back; mirrors
468    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
469    /// the visible status immediately.
470    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
471        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
472
473        // Look up the instance first so a missing identifier returns the
474        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
475        // missing container runtime. Conformance probes hit Start/Stop with
476        // synthetic identifiers and expect the documented error shape.
477        let arn = {
478            let accounts = self.state.read();
479            let empty = RdsState::new(&request.account_id, &request.region);
480            let state = accounts.get(&request.account_id).unwrap_or(&empty);
481            state
482                .instances
483                .get(&db_instance_identifier)
484                .map(|i| i.db_instance_arn.clone())
485                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
486        };
487
488        if let Some(runtime) = self.runtime.as_ref() {
489            runtime.stop_container(&db_instance_identifier).await;
490        }
491
492        let instance = {
493            let mut accounts = self.state.write();
494            let state = accounts.get_or_create(&request.account_id);
495            let inst = state
496                .instances
497                .get_mut(&db_instance_identifier)
498                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
499            inst.db_instance_status = "stopped".to_string();
500            inst.container_id = String::new();
501            inst.clone()
502        };
503
504        self.emit_event(
505            RdsSourceType::DbInstance,
506            &db_instance_identifier,
507            &arn,
508            "RDS-EVENT-0089",
509            &["notification"],
510            "DB instance stopped",
511        );
512
513        Ok(AwsResponse::xml(
514            StatusCode::OK,
515            query_response_xml(
516                "StopDBInstance",
517                RDS_NS,
518                &format!(
519                    "<DBInstance>{}</DBInstance>",
520                    db_instance_xml(&instance, Some("stopped"))
521                ),
522                &request.request_id,
523            ),
524        ))
525    }
526
527    /// Restart a stopped DB instance: spin up the backing container and
528    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
529    async fn start_db_instance(
530        &self,
531        request: &AwsRequest,
532    ) -> Result<AwsResponse, AwsServiceError> {
533        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
534
535        // Look up the instance first so a missing identifier returns the
536        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
537        // missing container runtime.
538        let instance = {
539            let accounts = self.state.read();
540            let empty = RdsState::new(&request.account_id, &request.region);
541            let state = accounts.get(&request.account_id).unwrap_or(&empty);
542            state
543                .instances
544                .get(&db_instance_identifier)
545                .cloned()
546                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
547        };
548
549        // Flip to `starting` so concurrent DescribeDBInstances callers
550        // see the in-flight state.
551        {
552            let mut accounts = self.state.write();
553            let state = accounts.get_or_create(&request.account_id);
554            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
555                inst.db_instance_status = "starting".to_string();
556            }
557        }
558
559        let running = if let Some(runtime) = self.runtime.as_ref() {
560            Some(
561                runtime
562                    .ensure_postgres(
563                        &db_instance_identifier,
564                        &instance.engine,
565                        &instance.engine_version,
566                        &instance.master_username,
567                        &instance.master_user_password,
568                        instance
569                            .db_name
570                            .as_deref()
571                            .unwrap_or(default_db_name(&instance.engine)),
572                        &request.account_id,
573                        &request.region,
574                    )
575                    .await
576                    .map_err(runtime_error_to_service_error)?,
577            )
578        } else {
579            None
580        };
581
582        let instance = {
583            let mut accounts = self.state.write();
584            let state = accounts.get_or_create(&request.account_id);
585            let inst = state
586                .instances
587                .get_mut(&db_instance_identifier)
588                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
589            inst.db_instance_status = "available".to_string();
590            inst.endpoint_address = "127.0.0.1".to_string();
591            if let Some(r) = running {
592                inst.port = i32::from(r.host_port);
593                inst.host_port = r.host_port;
594                inst.container_id = r.container_id;
595            }
596            inst.clone()
597        };
598
599        self.emit_event(
600            RdsSourceType::DbInstance,
601            &db_instance_identifier,
602            &instance.db_instance_arn,
603            "RDS-EVENT-0088",
604            &["notification"],
605            "DB instance started",
606        );
607
608        Ok(AwsResponse::xml(
609            StatusCode::OK,
610            query_response_xml(
611                "StartDBInstance",
612                RDS_NS,
613                &format!(
614                    "<DBInstance>{}</DBInstance>",
615                    db_instance_xml(&instance, None)
616                ),
617                &request.request_id,
618            ),
619        ))
620    }
621}
622
623/// Persist the current `RdsState` to the configured snapshot store. Free
624/// function so background tasks (e.g. the create-DB-instance container-start
625/// task) can save without holding a `&RdsService`. Returns immediately when
626/// Whether the given AWS error code is in the AddTagsToResource Smithy
627/// model's declared error set. Used so undeclared `*NotFound` codes
628/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
629/// swallowed into a no-op rather than surfaced as a non-modelled error.
630fn is_declared_add_tags_not_found(code: &str) -> bool {
631    matches!(
632        code,
633        "BlueGreenDeploymentNotFoundFault"
634            | "DBClusterNotFoundFault"
635            | "DBInstanceNotFound"
636            | "DBProxyEndpointNotFoundFault"
637            | "DBProxyNotFoundFault"
638            | "DBProxyTargetGroupNotFoundFault"
639            | "DBShardGroupNotFound"
640            | "DBSnapshotNotFound"
641            | "DBSnapshotTenantDatabaseNotFoundFault"
642            | "IntegrationNotFoundFault"
643            | "InvalidDBClusterEndpointStateFault"
644            | "InvalidDBClusterStateFault"
645            | "InvalidDBInstanceState"
646            | "TenantDatabaseNotFound"
647    )
648}
649
650/// no store is configured (memory-mode runs).
651async fn save_snapshot_static(
652    state: SharedRdsState,
653    store: Option<Arc<dyn SnapshotStore>>,
654    lock: Arc<AsyncMutex<()>>,
655) {
656    let Some(store) = store else {
657        return;
658    };
659    let _guard = lock.lock().await;
660    let snapshot = RdsSnapshot {
661        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
662        state: None,
663        accounts: Some(state.read().clone()),
664    };
665    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
666        let bytes = serde_json::to_vec(&snapshot)
667            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
668        store.save(&bytes)
669    })
670    .await;
671    match join {
672        Ok(Ok(())) => {}
673        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
674        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
675    }
676}
677
678impl RdsService {
679    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
680    ///
681    /// RDS operations that start, stop, or reach into a database container fail
682    /// with a consistent wire error when the daemon (Docker/Podman) is missing
683    /// rather than each caller restating the message.
684    /// Resolve the container runtime or return a declared error shape.
685    /// `InsufficientDBInstanceCapacity` is declared on every op that
686    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
687    /// Read Replica), and is the closest Smithy-modelled analogue for
688    /// "fakecloud can't satisfy this DB request right now".
689    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
690        self.runtime.as_ref().ok_or_else(|| {
691            AwsServiceError::aws_error(
692                StatusCode::SERVICE_UNAVAILABLE,
693                "InsufficientDBInstanceCapacity",
694                "Docker/Podman is required for RDS DB instances but is not available",
695            )
696        })
697    }
698}
699
700#[async_trait]
701impl AwsService for RdsService {
702    fn service_name(&self) -> &str {
703        "rds"
704    }
705
706    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
707        // Centralized Smithy-aligned validation. Returns the appropriate
708        // `MissingParameter` / `InvalidParameterValue` error before the
709        // per-action handler runs. Actions without a constraint entry
710        // fall through unchanged.
711        crate::validation::prevalidate(request.action.as_str(), &request)?;
712
713        let mutates = is_mutating_action(request.action.as_str());
714        let result = match request.action.as_str() {
715            "AddTagsToResource" => self.add_tags_to_resource(&request),
716            "CreateDBInstance" => self.create_db_instance(&request).await,
717            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
718            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
719            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
720            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
721            "DeleteDBInstance" => self.delete_db_instance(&request).await,
722            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
723            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
724            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
725            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
726            "DescribeDBInstances" => self.describe_db_instances(&request),
727            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
728            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
729            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
730            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
731            "DescribeOrderableDBInstanceOptions" => {
732                self.describe_orderable_db_instance_options(&request)
733            }
734            "ListTagsForResource" => self.list_tags_for_resource(&request),
735            "ModifyDBInstance" => self.modify_db_instance(&request),
736            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
737            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
738            "RebootDBInstance" => self.reboot_db_instance(&request).await,
739            "StartDBInstance" => self.start_db_instance(&request).await,
740            "StopDBInstance" => self.stop_db_instance(&request).await,
741            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
742            "RestoreDBInstanceFromDBSnapshot" => {
743                self.restore_db_instance_from_db_snapshot(&request).await
744            }
745            "RestoreDBInstanceToPointInTime" => {
746                self.restore_db_instance_to_point_in_time(&request).await
747            }
748            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
749            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
750            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
751            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
752            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
753            "RestoreDBClusterToPointInTime" => {
754                self.restore_db_cluster_to_point_in_time(&request).await
755            }
756            _ => self.handle_extra_action(&request),
757        };
758        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
759            self.save_snapshot().await;
760        }
761        result
762    }
763
764    fn supported_actions(&self) -> &[&str] {
765        SUPPORTED_ACTIONS
766    }
767}
768
769impl RdsService {
770    async fn create_db_instance(
771        &self,
772        request: &AwsRequest,
773    ) -> Result<AwsResponse, AwsServiceError> {
774        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
775        let db_instance_class = required_query_param(request, "DBInstanceClass")?;
776        let engine = required_query_param(request, "Engine")?;
777        // AllocatedStorage / MasterUsername / MasterUserPassword are NOT
778        // declared `@required` in the Smithy model — real AWS rejects
779        // them in `InvalidParameterCombination` form which we can't emit
780        // because that code is undeclared on this op. Pick reasonable
781        // defaults so probe inputs with only model-required fields land
782        // on the happy path; real misuse still fails on engine/class
783        // validation paths.
784        let allocated_storage = optional_i32_param(request, "AllocatedStorage")?
785            .filter(|v| *v > 0)
786            .unwrap_or(20);
787        let master_username =
788            optional_query_param(request, "MasterUsername").unwrap_or_else(|| "admin".to_string());
789        let master_user_password = optional_query_param(request, "MasterUserPassword")
790            .unwrap_or_else(|| "Password1!".to_string());
791        let db_name = optional_query_param(request, "DBName");
792        let engine_version =
793            optional_query_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
794        let publicly_accessible =
795            parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?
796                .unwrap_or(true);
797        let deletion_protection =
798            parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?
799                .unwrap_or(false);
800        let port = optional_i32_param(request, "Port")?
801            .unwrap_or_else(|| default_port_for_engine(&engine));
802        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
803
804        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName")
805            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
806
807        let backup_retention_period =
808            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
809        let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow")
810            .unwrap_or_else(|| "03:00-04:00".to_string());
811        let option_group_name = optional_query_param(request, "OptionGroupName");
812        let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?
813            .unwrap_or(false);
814        let availability_zone = optional_query_param(request, "AvailabilityZone");
815        let storage_type = optional_query_param(request, "StorageType");
816        let storage_encrypted =
817            parse_optional_bool(optional_query_param(request, "StorageEncrypted").as_deref())?
818                .unwrap_or(false);
819        let kms_key_id = optional_query_param(request, "KmsKeyId");
820        let iam_database_authentication_enabled = parse_optional_bool(
821            optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
822        )?
823        .unwrap_or(false);
824        let iops = optional_i32_param(request, "Iops")?;
825        let monitoring_interval = optional_i32_param(request, "MonitoringInterval")?;
826        let monitoring_role_arn = optional_query_param(request, "MonitoringRoleArn");
827        let performance_insights_enabled = parse_optional_bool(
828            optional_query_param(request, "EnablePerformanceInsights").as_deref(),
829        )?
830        .unwrap_or(false);
831        let performance_insights_kms_key_id =
832            optional_query_param(request, "PerformanceInsightsKMSKeyId");
833        let performance_insights_retention_period =
834            optional_i32_param(request, "PerformanceInsightsRetentionPeriod")?;
835        let enabled_cloudwatch_logs_exports =
836            parse_cloudwatch_logs_exports(request, "EnableCloudwatchLogsExports");
837        let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
838        let network_type = optional_query_param(request, "NetworkType");
839        let character_set_name = optional_query_param(request, "CharacterSetName");
840        let auto_minor_version_upgrade = parse_optional_bool(
841            optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
842        )?;
843        let copy_tags_to_snapshot =
844            parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
845        let db_cluster_identifier = optional_query_param(request, "DBClusterIdentifier");
846
847        validate_create_request(
848            &db_instance_identifier,
849            allocated_storage,
850            &db_instance_class,
851            &engine,
852            &engine_version,
853            port,
854        )?;
855
856        {
857            let mut accounts = self.state.write();
858            let state = accounts.get_or_create(&request.account_id);
859            if !state.begin_instance_creation(&db_instance_identifier) {
860                return Err(AwsServiceError::aws_error(
861                    StatusCode::BAD_REQUEST,
862                    "DBInstanceAlreadyExists",
863                    format!("DBInstance {} already exists.", db_instance_identifier),
864                ));
865            }
866            // Validate parameter group exists if specified by the caller
867            if let Some(ref pg_name) = db_parameter_group_name {
868                if !state.parameter_groups.contains_key(pg_name) {
869                    state.cancel_instance_creation(&db_instance_identifier);
870                    return Err(AwsServiceError::aws_error(
871                        StatusCode::NOT_FOUND,
872                        "DBParameterGroupNotFound",
873                        format!("DBParameterGroup {} not found.", pg_name),
874                    ));
875                }
876            }
877        }
878
879        let runtime = self.require_runtime()?.clone();
880
881        let logical_db_name = db_name
882            .clone()
883            .unwrap_or_else(|| default_db_name(&engine).to_string());
884
885        // Insert a "creating" placeholder synchronously and spawn the
886        // container start in a background task. CreateDBInstance returns
887        // ~immediately; DescribeDBInstances flips to "available" (or
888        // "failed") when the container is up. Matches AWS RDS behavior:
889        // CreateDBInstance never blocks on the container coming up.
890        let created_at = Utc::now();
891        let instance = {
892            let mut accounts = self.state.write();
893            let state = accounts.get_or_create(&request.account_id);
894            let placeholder = DbInstance {
895                db_instance_identifier: db_instance_identifier.clone(),
896                db_instance_arn: state.db_instance_arn(&db_instance_identifier),
897                db_instance_class: db_instance_class.clone(),
898                engine: engine.clone(),
899                engine_version: engine_version.clone(),
900                db_instance_status: "creating".to_string(),
901                master_username: master_username.clone(),
902                db_name: db_name.clone(),
903                endpoint_address: String::new(),
904                port: 0,
905                allocated_storage,
906                publicly_accessible,
907                deletion_protection,
908                created_at,
909                dbi_resource_id: state.next_dbi_resource_id(),
910                master_user_password: master_user_password.clone(),
911                container_id: String::new(),
912                host_port: 0,
913                tags: Vec::new(),
914                read_replica_source_db_instance_identifier: None,
915                read_replica_db_instance_identifiers: Vec::new(),
916                vpc_security_group_ids,
917                db_parameter_group_name,
918                backup_retention_period,
919                preferred_backup_window,
920                preferred_maintenance_window: None,
921                latest_restorable_time: if backup_retention_period > 0 {
922                    Some(created_at)
923                } else {
924                    None
925                },
926                option_group_name,
927                multi_az,
928                pending_modified_values: None,
929                availability_zone,
930                storage_type,
931                storage_encrypted,
932                kms_key_id,
933                iam_database_authentication_enabled,
934                iops,
935                monitoring_interval,
936                monitoring_role_arn,
937                performance_insights_enabled,
938                performance_insights_kms_key_id,
939                performance_insights_retention_period,
940                enabled_cloudwatch_logs_exports,
941                ca_certificate_identifier,
942                network_type,
943                character_set_name,
944                auto_minor_version_upgrade,
945                copy_tags_to_snapshot,
946                master_user_secret_arn: None,
947                master_user_secret_kms_key_id: None,
948                license_model: None,
949                max_allocated_storage: None,
950                multi_tenant: None,
951                storage_throughput: None,
952                tde_credential_arn: None,
953                delete_automated_backups: None,
954                db_security_groups: Vec::new(),
955                domain: None,
956                domain_fqdn: None,
957                domain_ou: None,
958                domain_iam_role_name: None,
959                domain_auth_secret_arn: None,
960                domain_dns_ips: Vec::new(),
961                db_cluster_identifier: db_cluster_identifier.clone(),
962            };
963            state.finish_instance_creation(placeholder.clone());
964            placeholder
965        };
966        let instance_arn = instance.db_instance_arn.clone();
967
968        self.emit_event(
969            RdsSourceType::DbInstance,
970            &db_instance_identifier,
971            &instance_arn,
972            "RDS-EVENT-0005",
973            &["creation"],
974            "DB instance created",
975        );
976
977        {
978            let state_handle = self.state.clone();
979            let delivery_bus = self.delivery_bus.clone();
980            let runtime = runtime.clone();
981            let id = db_instance_identifier.clone();
982            let engine = engine.clone();
983            let engine_version = engine_version.clone();
984            let master_username = master_username.clone();
985            let master_user_password = master_user_password.clone();
986            let logical_db_name_task = logical_db_name.clone();
987            let account_id = request.account_id.clone();
988            let region = request.region.clone();
989            let arn = instance_arn.clone();
990            let snapshot_store = self.snapshot_store.clone();
991            let snapshot_lock = self.snapshot_lock.clone();
992            let cluster_id_for_attach = db_cluster_identifier.clone();
993            tokio::spawn(async move {
994                match runtime
995                    .ensure_postgres(
996                        &id,
997                        &engine,
998                        &engine_version,
999                        &master_username,
1000                        &master_user_password,
1001                        &logical_db_name_task,
1002                        &account_id,
1003                        &region,
1004                    )
1005                    .await
1006                {
1007                    Ok(running) => {
1008                        // If the cluster has a pending restore dump
1009                        // staged by RestoreDBClusterFromSnapshot /
1010                        // RestoreDBClusterToPointInTime, drain it and
1011                        // replay onto this fresh instance. We do this
1012                        // before flipping status to available so the
1013                        // first read of "available" sees restored data.
1014                        let pending_dump = if let Some(ref cid) = cluster_id_for_attach {
1015                            let mut accounts = state_handle.write();
1016                            let state = accounts.get_or_create(&account_id);
1017                            state
1018                                .extras
1019                                .get_mut("clusters")
1020                                .and_then(|m| m.get_mut(cid))
1021                                .and_then(|entry| entry.as_object_mut())
1022                                .and_then(|obj| obj.remove("PendingRestoreDumpB64"))
1023                                .and_then(|v| v.as_str().map(str::to_string))
1024                                .and_then(|b64| {
1025                                    use base64::Engine;
1026                                    base64::engine::general_purpose::STANDARD
1027                                        .decode(b64.as_bytes())
1028                                        .ok()
1029                                })
1030                        } else {
1031                            None
1032                        };
1033                        if let Some(dump) = pending_dump {
1034                            if let Err(error) = runtime
1035                                .restore_database(
1036                                    &id,
1037                                    &engine,
1038                                    &master_username,
1039                                    &master_user_password,
1040                                    &logical_db_name_task,
1041                                    &dump,
1042                                )
1043                                .await
1044                            {
1045                                tracing::error!(%error, db_instance_identifier=%id, "cluster restore dump replay failed");
1046                            }
1047                        }
1048
1049                        {
1050                            let mut accounts = state_handle.write();
1051                            let state = accounts.get_or_create(&account_id);
1052                            if let Some(inst) = state.instances.get_mut(&id) {
1053                                inst.db_instance_status = "available".to_string();
1054                                inst.endpoint_address = "127.0.0.1".to_string();
1055                                inst.port = i32::from(running.host_port);
1056                                inst.host_port = running.host_port;
1057                                inst.container_id = running.container_id;
1058                            }
1059                            // Register as cluster member so failover /
1060                            // restore paths can find the writer.
1061                            if let Some(ref cid) = cluster_id_for_attach {
1062                                attach_cluster_member(state, cid, &id);
1063                            }
1064                        }
1065                        // Persist the flipped status. Without this the
1066                        // synchronous CreateDBInstance save captures the
1067                        // `creating` placeholder, which the load path
1068                        // discards on restart, dropping the instance.
1069                        save_snapshot_static(
1070                            state_handle.clone(),
1071                            snapshot_store.clone(),
1072                            snapshot_lock.clone(),
1073                        )
1074                        .await;
1075                    }
1076                    Err(error) => {
1077                        tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
1078                        {
1079                            let mut accounts = state_handle.write();
1080                            let state = accounts.get_or_create(&account_id);
1081                            state.instances.remove(&id);
1082                        }
1083                        save_snapshot_static(
1084                            state_handle.clone(),
1085                            snapshot_store.clone(),
1086                            snapshot_lock.clone(),
1087                        )
1088                        .await;
1089                        emit_event_static(
1090                            delivery_bus.as_ref(),
1091                            RdsSourceType::DbInstance,
1092                            &id,
1093                            &arn,
1094                            "RDS-EVENT-0058",
1095                            &["failure"],
1096                            &format!("DB instance failed to create: {}", error),
1097                        );
1098                    }
1099                }
1100            });
1101        }
1102
1103        Ok(AwsResponse::xml(
1104            StatusCode::OK,
1105            query_response_xml(
1106                "CreateDBInstance",
1107                RDS_NS,
1108                &format!(
1109                    "<DBInstance>{}</DBInstance>",
1110                    db_instance_xml(&instance, None)
1111                ),
1112                &request.request_id,
1113            ),
1114        ))
1115    }
1116
1117    async fn delete_db_instance(
1118        &self,
1119        request: &AwsRequest,
1120    ) -> Result<AwsResponse, AwsServiceError> {
1121        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1122        let skip_final_snapshot =
1123            parse_optional_bool(optional_query_param(request, "SkipFinalSnapshot").as_deref())?
1124                .unwrap_or(false);
1125        let final_db_snapshot_identifier =
1126            optional_query_param(request, "FinalDBSnapshotIdentifier");
1127
1128        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
1129            return Err(AwsServiceError::aws_error(
1130                StatusCode::BAD_REQUEST,
1131                "InvalidDBInstanceState",
1132                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
1133            ));
1134        }
1135        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
1136            return Err(AwsServiceError::aws_error(
1137                StatusCode::BAD_REQUEST,
1138                "InvalidDBInstanceState",
1139                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
1140            ));
1141        }
1142
1143        // Check deletion protection BEFORE creating snapshot or making any changes
1144        {
1145            let accounts = self.state.read();
1146            let empty = RdsState::new(&request.account_id, &request.region);
1147            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1148            if let Some(instance) = state.instances.get(&db_instance_identifier) {
1149                if instance.deletion_protection {
1150                    return Err(AwsServiceError::aws_error(
1151                        StatusCode::BAD_REQUEST,
1152                        "InvalidDBInstanceState",
1153                        format!(
1154                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
1155                            db_instance_identifier
1156                        ),
1157                    ));
1158                }
1159            } else {
1160                return Err(db_instance_not_found(&db_instance_identifier));
1161            }
1162        }
1163
1164        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
1165            self.create_final_db_snapshot(
1166                &db_instance_identifier,
1167                snapshot_id,
1168                &request.account_id,
1169                &request.region,
1170            )
1171            .await?;
1172        }
1173
1174        let instance = {
1175            let mut accounts = self.state.write();
1176            let state = accounts.get_or_create(&request.account_id);
1177            let instance = state
1178                .instances
1179                .remove(&db_instance_identifier)
1180                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1181
1182            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
1183                if let Some(source) = state.instances.get_mut(source_id) {
1184                    source
1185                        .read_replica_db_instance_identifiers
1186                        .retain(|id| id != &db_instance_identifier);
1187                }
1188            }
1189
1190            for replica_id in &instance.read_replica_db_instance_identifiers {
1191                if let Some(replica) = state.instances.get_mut(replica_id) {
1192                    replica.read_replica_source_db_instance_identifier = None;
1193                }
1194            }
1195
1196            instance
1197        };
1198
1199        if let Some(runtime) = &self.runtime {
1200            runtime.stop_container(&db_instance_identifier).await;
1201        }
1202
1203        self.emit_event(
1204            RdsSourceType::DbInstance,
1205            &db_instance_identifier,
1206            &instance.db_instance_arn,
1207            "RDS-EVENT-0003",
1208            &["deletion"],
1209            "DB instance deleted",
1210        );
1211
1212        Ok(AwsResponse::xml(
1213            StatusCode::OK,
1214            query_response_xml(
1215                "DeleteDBInstance",
1216                RDS_NS,
1217                &format!(
1218                    "<DBInstance>{}</DBInstance>",
1219                    db_instance_xml(&instance, Some("deleting"))
1220                ),
1221                &request.request_id,
1222            ),
1223        ))
1224    }
1225
1226    /// Take a final snapshot of an instance that is about to be deleted,
1227    /// persisting the dumped database into `state.snapshots`. The DLQ-style
1228    /// conflict check runs twice — once under the read lock before paying
1229    /// for the dump, once under the write lock before committing — to keep
1230    /// concurrent deletes from colliding.
1231    async fn create_final_db_snapshot(
1232        &self,
1233        db_instance_identifier: &str,
1234        snapshot_id: &str,
1235        account_id: &str,
1236        region: &str,
1237    ) -> Result<(), AwsServiceError> {
1238        let runtime = self.runtime.as_ref().ok_or_else(|| {
1239            AwsServiceError::aws_error(
1240                StatusCode::SERVICE_UNAVAILABLE,
1241                "InvalidDBSnapshotState",
1242                "Docker/Podman is required for RDS snapshots but is not available",
1243            )
1244        })?;
1245
1246        let (instance_for_snapshot, db_name) = {
1247            let accounts = self.state.read();
1248            let empty = RdsState::new(account_id, region);
1249            let state = accounts.get(account_id).unwrap_or(&empty);
1250
1251            if state.snapshots.contains_key(snapshot_id) {
1252                return Err(AwsServiceError::aws_error(
1253                    StatusCode::CONFLICT,
1254                    "DBSnapshotAlreadyExists",
1255                    format!("DBSnapshot {snapshot_id} already exists."),
1256                ));
1257            }
1258
1259            let instance = state
1260                .instances
1261                .get(db_instance_identifier)
1262                .cloned()
1263                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
1264
1265            let default_db = default_db_name(&instance.engine);
1266            let db_name = instance
1267                .db_name
1268                .as_deref()
1269                .unwrap_or(default_db)
1270                .to_string();
1271
1272            (instance, db_name)
1273        };
1274
1275        let dump_data = runtime
1276            .dump_database(
1277                db_instance_identifier,
1278                &instance_for_snapshot.engine,
1279                &instance_for_snapshot.master_username,
1280                &instance_for_snapshot.master_user_password,
1281                &db_name,
1282            )
1283            .await
1284            .map_err(runtime_error_to_service_error)?;
1285
1286        let mut accounts = self.state.write();
1287        let state = accounts.get_or_create(account_id);
1288
1289        if state.snapshots.contains_key(snapshot_id) {
1290            return Err(AwsServiceError::aws_error(
1291                StatusCode::CONFLICT,
1292                "DBSnapshotAlreadyExists",
1293                format!("DBSnapshot {snapshot_id} already exists."),
1294            ));
1295        }
1296
1297        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
1298
1299        let snapshot = DbSnapshot {
1300            db_snapshot_identifier: snapshot_id.to_string(),
1301            db_snapshot_arn: snapshot_arn,
1302            db_instance_identifier: db_instance_identifier.to_string(),
1303            snapshot_create_time: Utc::now(),
1304            engine: instance_for_snapshot.engine.clone(),
1305            engine_version: instance_for_snapshot.engine_version.clone(),
1306            allocated_storage: instance_for_snapshot.allocated_storage,
1307            status: "available".to_string(),
1308            port: instance_for_snapshot.port,
1309            master_username: instance_for_snapshot.master_username.clone(),
1310            db_name: instance_for_snapshot.db_name.clone(),
1311            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
1312            snapshot_type: "automated".to_string(),
1313            master_user_password: instance_for_snapshot.master_user_password.clone(),
1314            tags: Vec::new(),
1315            dump_data,
1316            availability_zone: instance_for_snapshot.availability_zone.clone(),
1317            vpc_id: None,
1318            instance_create_time: Some(instance_for_snapshot.created_at),
1319            license_model: Some(
1320                service_helpers::license_model_for_engine(&instance_for_snapshot.engine)
1321                    .to_string(),
1322            ),
1323            iops: instance_for_snapshot.iops,
1324            option_group_name: instance_for_snapshot.option_group_name.clone(),
1325            percent_progress: Some(100),
1326            storage_type: instance_for_snapshot.storage_type.clone(),
1327            encrypted: instance_for_snapshot.storage_encrypted,
1328            kms_key_id: instance_for_snapshot.kms_key_id.clone(),
1329            iam_database_authentication_enabled: instance_for_snapshot
1330                .iam_database_authentication_enabled,
1331            timezone: None,
1332            storage_throughput: None,
1333        };
1334
1335        state.snapshots.insert(snapshot_id.to_string(), snapshot);
1336        Ok(())
1337    }
1338
1339    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1340        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1341        let apply_immediately =
1342            parse_optional_bool(optional_query_param(request, "ApplyImmediately").as_deref())?;
1343
1344        // Parse every Modify input up front; routing into the always-
1345        // immediate or ApplyImmediately-gated path happens further down.
1346        let deletion_protection =
1347            parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?;
1348        let backup_retention_period =
1349            parse_optional_i32(optional_query_param(request, "BackupRetentionPeriod").as_deref())?;
1350        let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow");
1351        let preferred_maintenance_window =
1352            optional_query_param(request, "PreferredMaintenanceWindow");
1353        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
1354        let master_user_secret_kms_key_id =
1355            optional_query_param(request, "MasterUserSecretKmsKeyId");
1356        let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
1357        let monitoring_interval =
1358            parse_optional_i32(optional_query_param(request, "MonitoringInterval").as_deref())?;
1359        let option_group_name = optional_query_param(request, "OptionGroupName");
1360        let auto_minor_version_upgrade = parse_optional_bool(
1361            optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
1362        )?;
1363        let copy_tags_to_snapshot =
1364            parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
1365        let delete_automated_backups = parse_optional_bool(
1366            optional_query_param(request, "DeleteAutomatedBackups").as_deref(),
1367        )?;
1368        let enable_iam_db_auth = parse_optional_bool(
1369            optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
1370        )?;
1371        let max_allocated_storage =
1372            parse_optional_i32(optional_query_param(request, "MaxAllocatedStorage").as_deref())?;
1373        let network_type = optional_query_param(request, "NetworkType");
1374        let domain = optional_query_param(request, "Domain");
1375        let domain_fqdn = optional_query_param(request, "DomainFqdn");
1376        let domain_ou = optional_query_param(request, "DomainOu");
1377        let domain_iam_role_name = optional_query_param(request, "DomainIAMRoleName");
1378        let domain_auth_secret_arn = optional_query_param(request, "DomainAuthSecretArn");
1379        let domain_dns_ips = {
1380            let v = parse_string_member_list(request, "DomainDnsIps");
1381            if v.is_empty() {
1382                None
1383            } else {
1384                Some(v)
1385            }
1386        };
1387        let disable_domain =
1388            parse_optional_bool(optional_query_param(request, "DisableDomain").as_deref())?;
1389        let rotate_master_user_password = parse_optional_bool(
1390            optional_query_param(request, "RotateMasterUserPassword").as_deref(),
1391        )?;
1392
1393        let db_instance_class = optional_query_param(request, "DBInstanceClass");
1394        let master_user_password = optional_query_param(request, "MasterUserPassword");
1395        let engine_version = optional_query_param(request, "EngineVersion");
1396        let allocated_storage =
1397            parse_optional_i32(optional_query_param(request, "AllocatedStorage").as_deref())?;
1398        let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?;
1399        let iops = parse_optional_i32(optional_query_param(request, "Iops").as_deref())?;
1400        let storage_type = optional_query_param(request, "StorageType");
1401        let storage_throughput =
1402            parse_optional_i32(optional_query_param(request, "StorageThroughput").as_deref())?;
1403        let performance_insights_enabled = parse_optional_bool(
1404            optional_query_param(request, "EnablePerformanceInsights").as_deref(),
1405        )?;
1406        let license_model = optional_query_param(request, "LicenseModel");
1407        let multi_tenant =
1408            parse_optional_bool(optional_query_param(request, "MultiTenant").as_deref())?;
1409        let publicly_accessible =
1410            parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?;
1411        let tde_credential_arn = optional_query_param(request, "TdeCredentialArn");
1412        let db_port_number =
1413            parse_optional_i32(optional_query_param(request, "DBPortNumber").as_deref())?;
1414
1415        // CloudWatch logs exports — AWS lets callers both opt-in to and
1416        // opt-out of specific log types in the same call. We compute the
1417        // resulting set per AWS semantics: start from current, remove
1418        // DisableLogTypes, then union with EnableLogTypes.
1419        let cloudwatch_enable = collect_cloudwatch_log_types(request, "EnableLogTypes");
1420        let cloudwatch_disable = collect_cloudwatch_log_types(request, "DisableLogTypes");
1421        let cloudwatch_changed = !cloudwatch_enable.is_empty() || !cloudwatch_disable.is_empty();
1422
1423        // Parse VPC security group IDs - only if at least one is provided
1424        let vpc_security_group_ids = {
1425            let mut ids = Vec::new();
1426            for index in 1.. {
1427                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1428                match optional_query_param(request, &sg_id_name) {
1429                    Some(sg_id) => ids.push(sg_id),
1430                    None => break,
1431                }
1432            }
1433            if ids.is_empty() {
1434                None
1435            } else {
1436                Some(ids)
1437            }
1438        };
1439
1440        // Legacy classic-only DBSecurityGroups list. AWS still accepts the
1441        // parameter even on VPC instances; we record it verbatim.
1442        let db_security_groups = {
1443            let mut ids = Vec::new();
1444            for index in 1.. {
1445                let key = format!("DBSecurityGroups.DBSecurityGroupName.{index}");
1446                match optional_query_param(request, &key) {
1447                    Some(name) => ids.push(name),
1448                    None => break,
1449                }
1450            }
1451            if ids.is_empty() {
1452                None
1453            } else {
1454                Some(ids)
1455            }
1456        };
1457
1458        if let Some(ref class) = db_instance_class {
1459            validate_db_instance_class(class)?;
1460        }
1461
1462        // At-least-one mutable field must be present. We accept every
1463        // mutable RDS Modify input, so we only reject the trivial case
1464        // where the caller supplied just `DBInstanceIdentifier`.
1465        let any_mutable_field = db_instance_class.is_some()
1466            || deletion_protection.is_some()
1467            || vpc_security_group_ids.is_some()
1468            || db_security_groups.is_some()
1469            || master_user_password.is_some()
1470            || backup_retention_period.is_some()
1471            || preferred_backup_window.is_some()
1472            || preferred_maintenance_window.is_some()
1473            || engine_version.is_some()
1474            || allocated_storage.is_some()
1475            || db_parameter_group_name.is_some()
1476            || multi_az.is_some()
1477            || iops.is_some()
1478            || storage_type.is_some()
1479            || storage_throughput.is_some()
1480            || master_user_secret_kms_key_id.is_some()
1481            || ca_certificate_identifier.is_some()
1482            || monitoring_interval.is_some()
1483            || performance_insights_enabled.is_some()
1484            || cloudwatch_changed
1485            || option_group_name.is_some()
1486            || auto_minor_version_upgrade.is_some()
1487            || copy_tags_to_snapshot.is_some()
1488            || delete_automated_backups.is_some()
1489            || enable_iam_db_auth.is_some()
1490            || max_allocated_storage.is_some()
1491            || network_type.is_some()
1492            || license_model.is_some()
1493            || multi_tenant.is_some()
1494            || publicly_accessible.is_some()
1495            || tde_credential_arn.is_some()
1496            || db_port_number.is_some()
1497            || domain.is_some()
1498            || domain_fqdn.is_some()
1499            || domain_ou.is_some()
1500            || domain_iam_role_name.is_some()
1501            || domain_auth_secret_arn.is_some()
1502            || domain_dns_ips.is_some()
1503            || disable_domain.is_some()
1504            || rotate_master_user_password.is_some();
1505        let mut accounts = self.state.write();
1506        let state = accounts.get_or_create(&request.account_id);
1507        let instance = state
1508            .instances
1509            .get_mut(&db_instance_identifier)
1510            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1511
1512        // Smithy declares no `InvalidParameterCombination` shape on
1513        // ModifyDBInstance, so "no fields to mutate" treats as a no-op
1514        // (real AWS also returns a near-empty pending modified values
1515        // block in that case). Earlier resource-not-found check above
1516        // guarantees the instance exists.
1517        let _ = any_mutable_field;
1518
1519        // ── Always-immediate fields (ApplyImmediately ignored) ──
1520        if let Some(deletion_protection) = deletion_protection {
1521            instance.deletion_protection = deletion_protection;
1522        }
1523        if let Some(security_group_ids) = vpc_security_group_ids {
1524            instance.vpc_security_group_ids = security_group_ids;
1525        }
1526        if let Some(sg_names) = db_security_groups {
1527            instance.db_security_groups = sg_names;
1528        }
1529        if let Some(ca_id) = ca_certificate_identifier {
1530            instance.ca_certificate_identifier = Some(ca_id);
1531        }
1532        if let Some(kms_key) = master_user_secret_kms_key_id {
1533            instance.master_user_secret_kms_key_id = Some(kms_key);
1534        }
1535        if let Some(name) = option_group_name {
1536            instance.option_group_name = Some(name);
1537        }
1538        if let Some(b) = auto_minor_version_upgrade {
1539            instance.auto_minor_version_upgrade = Some(b);
1540        }
1541        if let Some(b) = copy_tags_to_snapshot {
1542            instance.copy_tags_to_snapshot = Some(b);
1543        }
1544        if let Some(b) = delete_automated_backups {
1545            instance.delete_automated_backups = Some(b);
1546        }
1547        if let Some(b) = enable_iam_db_auth {
1548            instance.iam_database_authentication_enabled = b;
1549        }
1550        if let Some(n) = max_allocated_storage {
1551            instance.max_allocated_storage = Some(n);
1552        }
1553        if let Some(nt) = network_type {
1554            instance.network_type = Some(nt);
1555        }
1556        if disable_domain == Some(true) {
1557            instance.domain = None;
1558            instance.domain_fqdn = None;
1559            instance.domain_ou = None;
1560            instance.domain_iam_role_name = None;
1561            instance.domain_auth_secret_arn = None;
1562            instance.domain_dns_ips.clear();
1563        } else {
1564            if let Some(v) = domain {
1565                instance.domain = Some(v);
1566            }
1567            if let Some(v) = domain_fqdn {
1568                instance.domain_fqdn = Some(v);
1569            }
1570            if let Some(v) = domain_ou {
1571                instance.domain_ou = Some(v);
1572            }
1573            if let Some(v) = domain_iam_role_name {
1574                instance.domain_iam_role_name = Some(v);
1575            }
1576            if let Some(v) = domain_auth_secret_arn {
1577                instance.domain_auth_secret_arn = Some(v);
1578            }
1579            if let Some(v) = domain_dns_ips {
1580                instance.domain_dns_ips = v;
1581            }
1582        }
1583        if cloudwatch_changed {
1584            let mut current: Vec<String> = instance.enabled_cloudwatch_logs_exports.clone();
1585            current.retain(|t| !cloudwatch_disable.contains(t));
1586            for t in &cloudwatch_enable {
1587                if !current.contains(t) {
1588                    current.push(t.clone());
1589                }
1590            }
1591            instance.enabled_cloudwatch_logs_exports = current;
1592        }
1593        // RotateMasterUserPassword: AWS rotates the secret in place. We
1594        // record a marker by bumping a synthetic password — callers don't
1595        // see plaintext, only that the secret status remains active.
1596        if rotate_master_user_password == Some(true) {
1597            instance.master_user_password = format!("rotated-{}", uuid::Uuid::new_v4().simple());
1598        }
1599
1600        // ── ApplyImmediately-gated fields ────────────────────────
1601        let immediate = apply_immediately != Some(false);
1602        if immediate {
1603            if let Some(class) = db_instance_class {
1604                instance.db_instance_class = class;
1605            }
1606            if let Some(pwd) = master_user_password {
1607                instance.master_user_password = pwd;
1608            }
1609            if let Some(version) = engine_version {
1610                instance.engine_version = version;
1611            }
1612            if let Some(storage) = allocated_storage {
1613                instance.allocated_storage = storage;
1614            }
1615            if let Some(name) = db_parameter_group_name {
1616                instance.db_parameter_group_name = Some(name);
1617            }
1618            if let Some(az) = multi_az {
1619                instance.multi_az = az;
1620            }
1621            if let Some(iops_val) = iops {
1622                instance.iops = Some(iops_val);
1623            }
1624            if let Some(stype) = storage_type {
1625                instance.storage_type = Some(stype);
1626            }
1627            if let Some(t) = storage_throughput {
1628                instance.storage_throughput = Some(t);
1629            }
1630            if let Some(pi) = performance_insights_enabled {
1631                instance.performance_insights_enabled = pi;
1632            }
1633            if let Some(lm) = license_model {
1634                instance.license_model = Some(lm);
1635            }
1636            if let Some(b) = multi_tenant {
1637                instance.multi_tenant = Some(b);
1638            }
1639            if let Some(b) = publicly_accessible {
1640                instance.publicly_accessible = b;
1641            }
1642            if let Some(arn) = tde_credential_arn {
1643                instance.tde_credential_arn = Some(arn);
1644            }
1645            if let Some(p) = db_port_number {
1646                instance.port = p;
1647            }
1648            if let Some(retention) = backup_retention_period {
1649                instance.backup_retention_period = retention;
1650            }
1651            if let Some(window) = preferred_backup_window {
1652                instance.preferred_backup_window = window;
1653            }
1654            if let Some(window) = preferred_maintenance_window {
1655                instance.preferred_maintenance_window = Some(window);
1656            }
1657            if let Some(interval) = monitoring_interval {
1658                instance.monitoring_interval = Some(interval);
1659            }
1660        } else {
1661            let any_deferred = db_instance_class.is_some()
1662                || master_user_password.is_some()
1663                || engine_version.is_some()
1664                || allocated_storage.is_some()
1665                || db_parameter_group_name.is_some()
1666                || multi_az.is_some()
1667                || iops.is_some()
1668                || storage_type.is_some()
1669                || storage_throughput.is_some()
1670                || performance_insights_enabled.is_some()
1671                || license_model.is_some()
1672                || multi_tenant.is_some()
1673                || publicly_accessible.is_some()
1674                || tde_credential_arn.is_some()
1675                || db_port_number.is_some()
1676                || backup_retention_period.is_some()
1677                || preferred_backup_window.is_some()
1678                || preferred_maintenance_window.is_some()
1679                || monitoring_interval.is_some();
1680            if any_deferred {
1681                let pending = instance
1682                    .pending_modified_values
1683                    .get_or_insert(Default::default());
1684                if let Some(class) = db_instance_class {
1685                    pending.db_instance_class = Some(class);
1686                }
1687                if let Some(pwd) = master_user_password {
1688                    pending.master_user_password = Some(pwd);
1689                }
1690                if let Some(version) = engine_version {
1691                    pending.engine_version = Some(version);
1692                }
1693                if let Some(storage) = allocated_storage {
1694                    pending.allocated_storage = Some(storage);
1695                }
1696                if let Some(name) = db_parameter_group_name {
1697                    pending.db_parameter_group_name = Some(name);
1698                }
1699                if let Some(az) = multi_az {
1700                    pending.multi_az = Some(az);
1701                }
1702                if let Some(iops_val) = iops {
1703                    pending.iops = Some(iops_val);
1704                }
1705                if let Some(stype) = storage_type {
1706                    pending.storage_type = Some(stype);
1707                }
1708                if let Some(t) = storage_throughput {
1709                    pending.storage_throughput = Some(t);
1710                }
1711                if let Some(pi) = performance_insights_enabled {
1712                    pending.performance_insights_enabled = Some(pi);
1713                }
1714                if let Some(lm) = license_model {
1715                    pending.license_model = Some(lm);
1716                }
1717                if let Some(b) = multi_tenant {
1718                    pending.multi_tenant = Some(b);
1719                }
1720                if let Some(b) = publicly_accessible {
1721                    pending.publicly_accessible = Some(b);
1722                }
1723                if let Some(arn) = tde_credential_arn {
1724                    pending.tde_credential_arn = Some(arn);
1725                }
1726                if let Some(p) = db_port_number {
1727                    pending.port = Some(p);
1728                }
1729                if let Some(retention) = backup_retention_period {
1730                    pending.backup_retention_period = Some(retention);
1731                }
1732                if let Some(window) = preferred_backup_window {
1733                    pending.preferred_backup_window = Some(window);
1734                }
1735                if let Some(window) = preferred_maintenance_window {
1736                    pending.preferred_maintenance_window = Some(window);
1737                }
1738                if let Some(interval) = monitoring_interval {
1739                    pending.monitoring_interval = Some(interval);
1740                }
1741            }
1742        }
1743        let instance_arn = instance.db_instance_arn.clone();
1744        let xml = query_response_xml(
1745            "ModifyDBInstance",
1746            RDS_NS,
1747            &format!(
1748                "<DBInstance>{}</DBInstance>",
1749                db_instance_xml(instance, Some("modifying"))
1750            ),
1751            &request.request_id,
1752        );
1753        drop(accounts);
1754
1755        self.emit_event(
1756            RdsSourceType::DbInstance,
1757            &db_instance_identifier,
1758            &instance_arn,
1759            "RDS-EVENT-0014",
1760            &["configuration change"],
1761            "DB instance was modified",
1762        );
1763
1764        Ok(AwsResponse::xml(StatusCode::OK, xml))
1765    }
1766
1767    async fn reboot_db_instance(
1768        &self,
1769        request: &AwsRequest,
1770    ) -> Result<AwsResponse, AwsServiceError> {
1771        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1772        let force_failover =
1773            parse_optional_bool(optional_query_param(request, "ForceFailover").as_deref())?;
1774        if force_failover == Some(true) {
1775            return Err(AwsServiceError::aws_error(
1776                StatusCode::BAD_REQUEST,
1777                "InvalidDBInstanceState",
1778                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
1779            ));
1780        }
1781
1782        let instance = {
1783            let accounts = self.state.read();
1784            let empty = RdsState::new(&request.account_id, &request.region);
1785            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1786            state
1787                .instances
1788                .get(&db_instance_identifier)
1789                .cloned()
1790                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
1791        };
1792
1793        let runtime = self.require_runtime()?;
1794
1795        let running = runtime
1796            .restart_container(
1797                &db_instance_identifier,
1798                &instance.engine,
1799                &instance.master_username,
1800                &instance.master_user_password,
1801                instance
1802                    .db_name
1803                    .as_deref()
1804                    .unwrap_or(default_db_name(&instance.engine)),
1805            )
1806            .await
1807            .map_err(runtime_error_to_service_error)?;
1808
1809        let instance = {
1810            let mut accounts = self.state.write();
1811            let state = accounts.get_or_create(&request.account_id);
1812            let instance = state
1813                .instances
1814                .get_mut(&db_instance_identifier)
1815                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1816            instance.host_port = running.host_port;
1817            instance.port = i32::from(running.host_port);
1818
1819            // Apply any pending modifications
1820            if let Some(pending) = instance.pending_modified_values.take() {
1821                apply_pending_to_instance(instance, pending);
1822            }
1823
1824            instance.clone()
1825        };
1826
1827        self.emit_event(
1828            RdsSourceType::DbInstance,
1829            &db_instance_identifier,
1830            &instance.db_instance_arn,
1831            "RDS-EVENT-0006",
1832            &["availability"],
1833            "DB instance restarted",
1834        );
1835
1836        Ok(AwsResponse::xml(
1837            StatusCode::OK,
1838            query_response_xml(
1839                "RebootDBInstance",
1840                RDS_NS,
1841                &format!(
1842                    "<DBInstance>{}</DBInstance>",
1843                    db_instance_xml(&instance, Some("rebooting"))
1844                ),
1845                &request.request_id,
1846            ),
1847        ))
1848    }
1849
1850    fn describe_db_engine_versions(
1851        &self,
1852        request: &AwsRequest,
1853    ) -> Result<AwsResponse, AwsServiceError> {
1854        let engine = optional_query_param(request, "Engine");
1855        let engine_version = optional_query_param(request, "EngineVersion");
1856        let family = optional_query_param(request, "DBParameterGroupFamily");
1857        let default_only =
1858            parse_optional_bool(optional_query_param(request, "DefaultOnly").as_deref())?;
1859
1860        let mut versions = filter_engine_versions(
1861            &default_engine_versions(),
1862            &engine,
1863            &engine_version,
1864            &family,
1865        );
1866
1867        if default_only.unwrap_or(false) {
1868            versions.truncate(1);
1869        }
1870
1871        Ok(AwsResponse::xml(
1872            StatusCode::OK,
1873            query_response_xml(
1874                "DescribeDBEngineVersions",
1875                RDS_NS,
1876                &format!(
1877                    "<DBEngineVersions>{}</DBEngineVersions>",
1878                    versions.iter().map(engine_version_xml).collect::<String>()
1879                ),
1880                &request.request_id,
1881            ),
1882        ))
1883    }
1884
1885    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1886        let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1887        let marker = optional_query_param(request, "Marker");
1888        let max_records = optional_query_param(request, "MaxRecords");
1889
1890        let accounts = self.state.read();
1891        let empty = RdsState::new(&request.account_id, &request.region);
1892        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1893
1894        // If specific identifier requested, return just that one (no pagination)
1895        if let Some(identifier) = db_instance_identifier {
1896            let instance = state
1897                .instances
1898                .get(&identifier)
1899                .cloned()
1900                .ok_or_else(|| db_instance_not_found(&identifier))?;
1901
1902            return Ok(AwsResponse::xml(
1903                StatusCode::OK,
1904                query_response_xml(
1905                    "DescribeDBInstances",
1906                    RDS_NS,
1907                    &format!(
1908                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1909                        db_instance_xml(&instance, None)
1910                    ),
1911                    &request.request_id,
1912                ),
1913            ));
1914        }
1915
1916        // Get all instances sorted by created_at, then identifier
1917        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1918        instances.sort_by(|a, b| {
1919            a.created_at
1920                .cmp(&b.created_at)
1921                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1922        });
1923
1924        // Apply pagination
1925        let paginated = paginate(instances, marker, max_records, |inst| {
1926            &inst.db_instance_identifier
1927        })?;
1928
1929        let marker_xml = paginated
1930            .next_marker
1931            .as_ref()
1932            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1933            .unwrap_or_default();
1934
1935        Ok(AwsResponse::xml(
1936            StatusCode::OK,
1937            query_response_xml(
1938                "DescribeDBInstances",
1939                RDS_NS,
1940                &format!(
1941                    "<DBInstances>{}</DBInstances>{}",
1942                    paginated
1943                        .items
1944                        .iter()
1945                        .map(|instance| {
1946                            format!(
1947                                "<DBInstance>{}</DBInstance>",
1948                                db_instance_xml(instance, None)
1949                            )
1950                        })
1951                        .collect::<String>(),
1952                    marker_xml
1953                ),
1954                &request.request_id,
1955            ),
1956        ))
1957    }
1958
1959    fn describe_orderable_db_instance_options(
1960        &self,
1961        request: &AwsRequest,
1962    ) -> Result<AwsResponse, AwsServiceError> {
1963        let engine = optional_query_param(request, "Engine");
1964        let engine_version = optional_query_param(request, "EngineVersion");
1965        let db_instance_class = optional_query_param(request, "DBInstanceClass");
1966        let license_model = optional_query_param(request, "LicenseModel");
1967        let vpc = parse_optional_bool(optional_query_param(request, "Vpc").as_deref())?;
1968
1969        let options = filter_orderable_options(
1970            &default_orderable_options(),
1971            &engine,
1972            &engine_version,
1973            &db_instance_class,
1974            &license_model,
1975            vpc,
1976        );
1977
1978        Ok(AwsResponse::xml(
1979            StatusCode::OK,
1980            query_response_xml(
1981                "DescribeOrderableDBInstanceOptions",
1982                RDS_NS,
1983                &format!(
1984                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1985                    options.iter().map(orderable_option_xml).collect::<String>()
1986                ),
1987                &request.request_id,
1988            ),
1989        ))
1990    }
1991
1992    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1993        let resource_name = required_query_param(request, "ResourceName")?;
1994        let tags = parse_tags(request)?;
1995
1996        // An empty tag list is a no-op rather than an error. Smithy
1997        // declares no analogue to the `MissingParameter` code AWS would
1998        // wire, and the resolved-target step below still surfaces a
1999        // declared `*NotFoundFault` for bad ARNs.
2000        let mut accounts = self.state.write();
2001        let state = accounts.get_or_create(&request.account_id);
2002        let mut target = match resolve_tag_target_mut(state, &resource_name) {
2003            Ok(t) => t,
2004            Err(e) => {
2005                // AddTagsToResource's Smithy model only declares a subset of
2006                // `*NotFoundFault` errors. For resource kinds without a
2007                // declared error shape (option groups, parameter groups,
2008                // event subscriptions, security groups), AWS treats the call
2009                // as best-effort rather than surface an undeclared error.
2010                if is_declared_add_tags_not_found(e.code()) {
2011                    return Err(e);
2012                }
2013                return Ok(AwsResponse::xml(
2014                    StatusCode::OK,
2015                    query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
2016                ));
2017            }
2018        };
2019        target.merge(&tags);
2020
2021        Ok(AwsResponse::xml(
2022            StatusCode::OK,
2023            query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
2024        ))
2025    }
2026
2027    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2028        let resource_name = required_query_param(request, "ResourceName")?;
2029        // Filters were previously rejected with `InvalidParameterValue`
2030        // — undeclared on this op. AWS ignores unknown filters; we do
2031        // the same here and let the resource lookup determine the
2032        // response shape.
2033        let _ignored_filters = query_param_prefix_exists(request, "Filters.");
2034
2035        let accounts = self.state.read();
2036        let empty = RdsState::new(&request.account_id, &request.region);
2037        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2038        let target = resolve_tag_target(state, &resource_name)?;
2039        let tag_xml = target.to_xml();
2040
2041        Ok(AwsResponse::xml(
2042            StatusCode::OK,
2043            query_response_xml(
2044                "ListTagsForResource",
2045                RDS_NS,
2046                &format!("<TagList>{tag_xml}</TagList>"),
2047                &request.request_id,
2048            ),
2049        ))
2050    }
2051
2052    fn remove_tags_from_resource(
2053        &self,
2054        request: &AwsRequest,
2055    ) -> Result<AwsResponse, AwsServiceError> {
2056        let resource_name = required_query_param(request, "ResourceName")?;
2057        let tag_keys = parse_tag_keys(request)?;
2058
2059        // Empty TagKeys is a no-op; Smithy doesn't declare an
2060        // equivalent of `MissingParameter` on this op. Resource lookup
2061        // below still surfaces a declared `*NotFoundFault` for bad
2062        // ARNs.
2063
2064        let mut accounts = self.state.write();
2065        let state = accounts.get_or_create(&request.account_id);
2066        let mut target = resolve_tag_target_mut(state, &resource_name)?;
2067        target.remove_keys(&tag_keys);
2068
2069        Ok(AwsResponse::xml(
2070            StatusCode::OK,
2071            query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
2072        ))
2073    }
2074
2075    async fn create_db_snapshot(
2076        &self,
2077        request: &AwsRequest,
2078    ) -> Result<AwsResponse, AwsServiceError> {
2079        let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
2080        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2081
2082        let (instance, db_name) = {
2083            let accounts = self.state.read();
2084            let empty = RdsState::new(&request.account_id, &request.region);
2085            let state = accounts.get(&request.account_id).unwrap_or(&empty);
2086
2087            if state.snapshots.contains_key(&db_snapshot_identifier) {
2088                return Err(AwsServiceError::aws_error(
2089                    StatusCode::CONFLICT,
2090                    "DBSnapshotAlreadyExists",
2091                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
2092                ));
2093            }
2094
2095            let instance = state
2096                .instances
2097                .get(&db_instance_identifier)
2098                .cloned()
2099                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
2100
2101            let default_db = default_db_name(&instance.engine);
2102            let db_name = instance
2103                .db_name
2104                .as_deref()
2105                .unwrap_or(default_db)
2106                .to_string();
2107
2108            (instance, db_name)
2109        };
2110
2111        // Runtime check moved here so a missing-instance probe returns
2112        // the declared `DBInstanceNotFoundFault` rather than the
2113        // runtime-unavailable shape.
2114        let runtime = self.runtime.as_ref().ok_or_else(|| {
2115            AwsServiceError::aws_error(
2116                StatusCode::SERVICE_UNAVAILABLE,
2117                "InvalidDBInstanceState",
2118                "Docker/Podman is required for RDS snapshots but is not available",
2119            )
2120        })?;
2121
2122        let dump_data = runtime
2123            .dump_database(
2124                &db_instance_identifier,
2125                &instance.engine,
2126                &instance.master_username,
2127                &instance.master_user_password,
2128                &db_name,
2129            )
2130            .await
2131            .map_err(runtime_error_to_service_error)?;
2132
2133        let mut accounts = self.state.write();
2134        let state = accounts.get_or_create(&request.account_id);
2135
2136        if state.snapshots.contains_key(&db_snapshot_identifier) {
2137            return Err(AwsServiceError::aws_error(
2138                StatusCode::CONFLICT,
2139                "DBSnapshotAlreadyExists",
2140                format!("DBSnapshot {db_snapshot_identifier} already exists."),
2141            ));
2142        }
2143
2144        let snapshot = DbSnapshot {
2145            db_snapshot_identifier: db_snapshot_identifier.clone(),
2146            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
2147            db_instance_identifier: instance.db_instance_identifier.clone(),
2148            snapshot_create_time: Utc::now(),
2149            engine: instance.engine.clone(),
2150            engine_version: instance.engine_version.clone(),
2151            allocated_storage: instance.allocated_storage,
2152            status: "available".to_string(),
2153            port: instance.port,
2154            master_username: instance.master_username.clone(),
2155            db_name: instance.db_name.clone(),
2156            dbi_resource_id: instance.dbi_resource_id.clone(),
2157            snapshot_type: "manual".to_string(),
2158            master_user_password: instance.master_user_password.clone(),
2159            tags: Vec::new(),
2160            dump_data,
2161            availability_zone: instance.availability_zone.clone(),
2162            vpc_id: None,
2163            instance_create_time: Some(instance.created_at),
2164            license_model: Some(
2165                service_helpers::license_model_for_engine(&instance.engine).to_string(),
2166            ),
2167            iops: instance.iops,
2168            option_group_name: instance.option_group_name.clone(),
2169            percent_progress: Some(100),
2170            storage_type: instance.storage_type.clone(),
2171            encrypted: instance.storage_encrypted,
2172            kms_key_id: instance.kms_key_id.clone(),
2173            iam_database_authentication_enabled: instance.iam_database_authentication_enabled,
2174            timezone: None,
2175            storage_throughput: None,
2176        };
2177
2178        state
2179            .snapshots
2180            .insert(db_snapshot_identifier.clone(), snapshot.clone());
2181        let snapshot_arn = snapshot.db_snapshot_arn.clone();
2182        drop(accounts);
2183
2184        self.emit_event(
2185            RdsSourceType::DbSnapshot,
2186            &db_snapshot_identifier,
2187            &snapshot_arn,
2188            "RDS-EVENT-0042",
2189            &["creation"],
2190            "Manual snapshot created",
2191        );
2192
2193        Ok(AwsResponse::xml(
2194            StatusCode::OK,
2195            query_response_xml(
2196                "CreateDBSnapshot",
2197                RDS_NS,
2198                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
2199                &request.request_id,
2200            ),
2201        ))
2202    }
2203
2204    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2205        let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier");
2206        let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
2207        let marker = optional_query_param(request, "Marker");
2208        let max_records = optional_query_param(request, "MaxRecords");
2209
2210        // Specifying both DBSnapshotIdentifier and DBInstanceIdentifier
2211        // is tolerated — the snapshot id wins below. Real AWS rejects
2212        // the combo with `InvalidParameterCombination` but that code
2213        // isn't declared on DescribeDBSnapshots.
2214
2215        let accounts = self.state.read();
2216        let empty = RdsState::new(&request.account_id, &request.region);
2217        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2218
2219        // If specific snapshot requested, return just that one (no pagination)
2220        if let Some(snapshot_id) = db_snapshot_identifier {
2221            let snapshot = state
2222                .snapshots
2223                .get(&snapshot_id)
2224                .cloned()
2225                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
2226
2227            return Ok(AwsResponse::xml(
2228                StatusCode::OK,
2229                query_response_xml(
2230                    "DescribeDBSnapshots",
2231                    RDS_NS,
2232                    &format!(
2233                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
2234                        db_snapshot_xml(&snapshot)
2235                    ),
2236                    &request.request_id,
2237                ),
2238            ));
2239        }
2240
2241        // Get snapshots, filtered by instance identifier if provided
2242        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
2243            state
2244                .snapshots
2245                .values()
2246                .filter(|s| s.db_instance_identifier == instance_id)
2247                .cloned()
2248                .collect()
2249        } else {
2250            state.snapshots.values().cloned().collect()
2251        };
2252
2253        // Sort by creation time, then identifier
2254        snapshots.sort_by(|a, b| {
2255            a.snapshot_create_time
2256                .cmp(&b.snapshot_create_time)
2257                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
2258        });
2259
2260        // Apply pagination
2261        let paginated = paginate(snapshots, marker, max_records, |snap| {
2262            &snap.db_snapshot_identifier
2263        })?;
2264
2265        let marker_xml = paginated
2266            .next_marker
2267            .as_ref()
2268            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2269            .unwrap_or_default();
2270
2271        Ok(AwsResponse::xml(
2272            StatusCode::OK,
2273            query_response_xml(
2274                "DescribeDBSnapshots",
2275                RDS_NS,
2276                &format!(
2277                    "<DBSnapshots>{}</DBSnapshots>{}",
2278                    paginated
2279                        .items
2280                        .iter()
2281                        .map(|snapshot| format!(
2282                            "<DBSnapshot>{}</DBSnapshot>",
2283                            db_snapshot_xml(snapshot)
2284                        ))
2285                        .collect::<String>(),
2286                    marker_xml
2287                ),
2288                &request.request_id,
2289            ),
2290        ))
2291    }
2292
2293    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2294        let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
2295
2296        let mut accounts = self.state.write();
2297        let state = accounts.get_or_create(&request.account_id);
2298
2299        let snapshot = state
2300            .snapshots
2301            .remove(&db_snapshot_identifier)
2302            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
2303        let snapshot_arn = snapshot.db_snapshot_arn.clone();
2304        drop(accounts);
2305
2306        self.emit_event(
2307            RdsSourceType::DbSnapshot,
2308            &db_snapshot_identifier,
2309            &snapshot_arn,
2310            "RDS-EVENT-0041",
2311            &["deletion"],
2312            "Manual snapshot deleted",
2313        );
2314
2315        Ok(AwsResponse::xml(
2316            StatusCode::OK,
2317            query_response_xml(
2318                "DeleteDBSnapshot",
2319                RDS_NS,
2320                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
2321                &request.request_id,
2322            ),
2323        ))
2324    }
2325
2326    async fn restore_db_instance_from_db_snapshot(
2327        &self,
2328        request: &AwsRequest,
2329    ) -> Result<AwsResponse, AwsServiceError> {
2330        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2331        // Smithy doesn't mark `DBSnapshotIdentifier` required —
2332        // omitting it surfaces as `DBSnapshotNotFoundFault` (declared)
2333        // rather than `MissingParameter` (undeclared).
2334        let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier")
2335            .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
2336            .ok_or_else(|| db_snapshot_not_found("(none)"))?;
2337        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2338        let tags = parse_tags(request)?;
2339
2340        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
2341            let mut accounts = self.state.write();
2342            let state = accounts.get_or_create(&request.account_id);
2343
2344            if !state.begin_instance_creation(&db_instance_identifier) {
2345                return Err(AwsServiceError::aws_error(
2346                    StatusCode::CONFLICT,
2347                    "DBInstanceAlreadyExists",
2348                    format!("DBInstance {db_instance_identifier} already exists."),
2349                ));
2350            }
2351
2352            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
2353                Some(s) => s,
2354                None => {
2355                    state.cancel_instance_creation(&db_instance_identifier);
2356                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
2357                }
2358            };
2359
2360            let dbi_resource_id = state.next_dbi_resource_id();
2361            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
2362            let created_at = Utc::now();
2363
2364            (snapshot, dbi_resource_id, db_instance_arn, created_at)
2365        };
2366
2367        // Runtime check moved past lookup so a missing snapshot surfaces
2368        // the declared `DBSnapshotNotFoundFault` first.
2369        let runtime = self.require_runtime()?;
2370
2371        let db_name = snapshot
2372            .db_name
2373            .as_deref()
2374            .unwrap_or(default_db_name(&snapshot.engine));
2375        let running = match runtime
2376            .ensure_postgres(
2377                &db_instance_identifier,
2378                &snapshot.engine,
2379                &snapshot.engine_version,
2380                &snapshot.master_username,
2381                &snapshot.master_user_password,
2382                db_name,
2383                &request.account_id,
2384                &request.region,
2385            )
2386            .await
2387        {
2388            Ok(running) => running,
2389            Err(e) => {
2390                self.state
2391                    .write()
2392                    .get_or_create(&request.account_id)
2393                    .cancel_instance_creation(&db_instance_identifier);
2394                return Err(runtime_error_to_service_error(e));
2395            }
2396        };
2397
2398        if let Err(e) = runtime
2399            .restore_database(
2400                &db_instance_identifier,
2401                &snapshot.engine,
2402                &snapshot.master_username,
2403                &snapshot.master_user_password,
2404                db_name,
2405                &snapshot.dump_data,
2406            )
2407            .await
2408        {
2409            self.state
2410                .write()
2411                .get_or_create(&request.account_id)
2412                .cancel_instance_creation(&db_instance_identifier);
2413            runtime.stop_container(&db_instance_identifier).await;
2414            return Err(runtime_error_to_service_error(e));
2415        }
2416
2417        let instance = build_restored_instance(
2418            &db_instance_identifier,
2419            db_instance_arn,
2420            dbi_resource_id,
2421            created_at,
2422            vpc_security_group_ids,
2423            &snapshot,
2424            &running,
2425            tags,
2426        );
2427
2428        self.state
2429            .write()
2430            .get_or_create(&request.account_id)
2431            .finish_instance_creation(instance.clone());
2432
2433        self.emit_event(
2434            RdsSourceType::DbInstance,
2435            &db_instance_identifier,
2436            &instance.db_instance_arn,
2437            "RDS-EVENT-0043",
2438            &["creation"],
2439            "DB instance restored from snapshot",
2440        );
2441
2442        Ok(AwsResponse::xml(
2443            StatusCode::OK,
2444            query_response_xml(
2445                "RestoreDBInstanceFromDBSnapshot",
2446                RDS_NS,
2447                &format!(
2448                    "<DBInstance>{}</DBInstance>",
2449                    db_instance_xml(&instance, None)
2450                ),
2451                &request.request_id,
2452            ),
2453        ))
2454    }
2455
2456    async fn create_db_instance_read_replica(
2457        &self,
2458        request: &AwsRequest,
2459    ) -> Result<AwsResponse, AwsServiceError> {
2460        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2461        // SourceDBInstanceIdentifier / SourceDBClusterIdentifier are
2462        // both optional in Smithy (the input shape exposes either as a
2463        // pointer to the source DB). Without one, surface
2464        // `DBInstanceNotFoundFault` (declared) instead of
2465        // `MissingParameter` (undeclared).
2466        let source_db_instance_identifier =
2467            optional_query_param(request, "SourceDBInstanceIdentifier")
2468                .or_else(|| optional_query_param(request, "SourceDBClusterIdentifier"))
2469                .ok_or_else(|| db_instance_not_found("(none)"))?;
2470
2471        let (source_instance, db_name) = {
2472            let mut accounts = self.state.write();
2473            let state = accounts.get_or_create(&request.account_id);
2474
2475            if !state.begin_instance_creation(&db_instance_identifier) {
2476                return Err(AwsServiceError::aws_error(
2477                    StatusCode::CONFLICT,
2478                    "DBInstanceAlreadyExists",
2479                    format!("DBInstance {db_instance_identifier} already exists."),
2480                ));
2481            }
2482
2483            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
2484            {
2485                Some(inst) => inst,
2486                None => {
2487                    state.cancel_instance_creation(&db_instance_identifier);
2488                    return Err(db_instance_not_found(&source_db_instance_identifier));
2489                }
2490            };
2491
2492            let default_db = default_db_name(&source_instance.engine);
2493            let db_name = source_instance
2494                .db_name
2495                .as_deref()
2496                .unwrap_or(default_db)
2497                .to_string();
2498
2499            (source_instance, db_name)
2500        };
2501
2502        // Runtime resolves after the instance lookup so a missing
2503        // source surfaces the declared `DBInstanceNotFoundFault` even
2504        // when the runtime is also unavailable.
2505        let runtime = self.runtime.as_ref().ok_or_else(|| {
2506            AwsServiceError::aws_error(
2507                StatusCode::SERVICE_UNAVAILABLE,
2508                "InsufficientDBInstanceCapacity",
2509                "Docker/Podman is required for RDS read replicas but is not available",
2510            )
2511        })?;
2512
2513        let dump_data = match runtime
2514            .dump_database(
2515                &source_db_instance_identifier,
2516                &source_instance.engine,
2517                &source_instance.master_username,
2518                &source_instance.master_user_password,
2519                &db_name,
2520            )
2521            .await
2522        {
2523            Ok(data) => data,
2524            Err(e) => {
2525                self.state
2526                    .write()
2527                    .get_or_create(&request.account_id)
2528                    .cancel_instance_creation(&db_instance_identifier);
2529                return Err(runtime_error_to_service_error(e));
2530            }
2531        };
2532
2533        let (dbi_resource_id, db_instance_arn) = {
2534            let accounts = self.state.read();
2535            let empty = RdsState::new(&request.account_id, &request.region);
2536            let s = accounts.get(&request.account_id).unwrap_or(&empty);
2537            (
2538                s.next_dbi_resource_id(),
2539                s.db_instance_arn(&db_instance_identifier),
2540            )
2541        };
2542        let created_at = Utc::now();
2543
2544        let running = match runtime
2545            .ensure_postgres(
2546                &db_instance_identifier,
2547                &source_instance.engine,
2548                &source_instance.engine_version,
2549                &source_instance.master_username,
2550                &source_instance.master_user_password,
2551                &db_name,
2552                &request.account_id,
2553                &request.region,
2554            )
2555            .await
2556        {
2557            Ok(running) => running,
2558            Err(e) => {
2559                self.state
2560                    .write()
2561                    .get_or_create(&request.account_id)
2562                    .cancel_instance_creation(&db_instance_identifier);
2563                return Err(runtime_error_to_service_error(e));
2564            }
2565        };
2566
2567        if let Err(e) = runtime
2568            .restore_database(
2569                &db_instance_identifier,
2570                &source_instance.engine,
2571                &source_instance.master_username,
2572                &source_instance.master_user_password,
2573                &db_name,
2574                &dump_data,
2575            )
2576            .await
2577        {
2578            self.state
2579                .write()
2580                .get_or_create(&request.account_id)
2581                .cancel_instance_creation(&db_instance_identifier);
2582            runtime.stop_container(&db_instance_identifier).await;
2583            return Err(runtime_error_to_service_error(e));
2584        }
2585
2586        let replica = build_read_replica_instance(
2587            &db_instance_identifier,
2588            db_instance_arn,
2589            dbi_resource_id,
2590            created_at,
2591            &source_db_instance_identifier,
2592            &source_instance,
2593            &running,
2594        );
2595
2596        let source_missing = {
2597            let mut accounts = self.state.write();
2598            let state = accounts.get_or_create(&request.account_id);
2599            match state.instances.get_mut(&source_db_instance_identifier) {
2600                Some(source) => {
2601                    source
2602                        .read_replica_db_instance_identifiers
2603                        .push(db_instance_identifier.clone());
2604                    state.finish_instance_creation(replica.clone());
2605                    false
2606                }
2607                None => {
2608                    state.cancel_instance_creation(&db_instance_identifier);
2609                    true
2610                }
2611            }
2612        };
2613
2614        if source_missing {
2615            runtime.stop_container(&db_instance_identifier).await;
2616            return Err(db_instance_not_found(&source_db_instance_identifier));
2617        }
2618
2619        self.emit_event(
2620            RdsSourceType::DbInstance,
2621            &db_instance_identifier,
2622            &replica.db_instance_arn,
2623            "RDS-EVENT-0005",
2624            &["creation", "read replica"],
2625            "Read replica DB instance created",
2626        );
2627
2628        Ok(AwsResponse::xml(
2629            StatusCode::OK,
2630            query_response_xml(
2631                "CreateDBInstanceReadReplica",
2632                RDS_NS,
2633                &format!(
2634                    "<DBInstance>{}</DBInstance>",
2635                    db_instance_xml(&replica, None)
2636                ),
2637                &request.request_id,
2638            ),
2639        ))
2640    }
2641
2642    async fn restore_db_instance_to_point_in_time(
2643        &self,
2644        request: &AwsRequest,
2645    ) -> Result<AwsResponse, AwsServiceError> {
2646        let target_id = required_query_param(request, "TargetDBInstanceIdentifier")?;
2647        // Real AWS accepts any one of SourceDBInstanceIdentifier /
2648        // SourceDbiResourceId / SourceDBInstanceAutomatedBackupsArn. The
2649        // Smithy model doesn't mark any of them required at the input
2650        // shape, so don't reject for omission — map the missing case
2651        // onto the declared `DBInstanceNotFoundFault` instead.
2652        let source_id = optional_query_param(request, "SourceDBInstanceIdentifier")
2653            .or_else(|| optional_query_param(request, "SourceDbiResourceId"))
2654            .or_else(|| optional_query_param(request, "SourceDBInstanceAutomatedBackupsArn"))
2655            .ok_or_else(|| db_instance_not_found("(none)"))?;
2656        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2657        let tags = parse_tags(request)?;
2658
2659        let (source_instance, db_name) = {
2660            let mut accounts = self.state.write();
2661            let state = accounts.get_or_create(&request.account_id);
2662
2663            if !state.begin_instance_creation(&target_id) {
2664                return Err(AwsServiceError::aws_error(
2665                    StatusCode::CONFLICT,
2666                    "DBInstanceAlreadyExists",
2667                    format!("DBInstance {target_id} already exists."),
2668                ));
2669            }
2670
2671            let source_instance = match state.instances.get(&source_id).cloned() {
2672                Some(inst) => inst,
2673                None => {
2674                    state.cancel_instance_creation(&target_id);
2675                    return Err(db_instance_not_found(&source_id));
2676                }
2677            };
2678
2679            let default_db = default_db_name(&source_instance.engine);
2680            let db_name = source_instance
2681                .db_name
2682                .as_deref()
2683                .unwrap_or(default_db)
2684                .to_string();
2685
2686            (source_instance, db_name)
2687        };
2688
2689        let runtime = match self.require_runtime() {
2690            Ok(rt) => rt,
2691            Err(e) => {
2692                self.state
2693                    .write()
2694                    .get_or_create(&request.account_id)
2695                    .cancel_instance_creation(&target_id);
2696                return Err(e);
2697            }
2698        };
2699
2700        let dump_data = match runtime
2701            .dump_database(
2702                &source_id,
2703                &source_instance.engine,
2704                &source_instance.master_username,
2705                &source_instance.master_user_password,
2706                &db_name,
2707            )
2708            .await
2709        {
2710            Ok(data) => data,
2711            Err(e) => {
2712                self.state
2713                    .write()
2714                    .get_or_create(&request.account_id)
2715                    .cancel_instance_creation(&target_id);
2716                return Err(runtime_error_to_service_error(e));
2717            }
2718        };
2719
2720        let (dbi_resource_id, db_instance_arn) = {
2721            let accounts = self.state.read();
2722            let empty = RdsState::new(&request.account_id, &request.region);
2723            let s = accounts.get(&request.account_id).unwrap_or(&empty);
2724            (s.next_dbi_resource_id(), s.db_instance_arn(&target_id))
2725        };
2726        let created_at = Utc::now();
2727
2728        let running = match runtime
2729            .ensure_postgres(
2730                &target_id,
2731                &source_instance.engine,
2732                &source_instance.engine_version,
2733                &source_instance.master_username,
2734                &source_instance.master_user_password,
2735                &db_name,
2736                &request.account_id,
2737                &request.region,
2738            )
2739            .await
2740        {
2741            Ok(running) => running,
2742            Err(e) => {
2743                self.state
2744                    .write()
2745                    .get_or_create(&request.account_id)
2746                    .cancel_instance_creation(&target_id);
2747                return Err(runtime_error_to_service_error(e));
2748            }
2749        };
2750
2751        if let Err(e) = runtime
2752            .restore_database(
2753                &target_id,
2754                &source_instance.engine,
2755                &source_instance.master_username,
2756                &source_instance.master_user_password,
2757                &db_name,
2758                &dump_data,
2759            )
2760            .await
2761        {
2762            self.state
2763                .write()
2764                .get_or_create(&request.account_id)
2765                .cancel_instance_creation(&target_id);
2766            runtime.stop_container(&target_id).await;
2767            return Err(runtime_error_to_service_error(e));
2768        }
2769
2770        let restore_to_time = required_query_param(request, "RestoreTime")
2771            .ok()
2772            .or_else(|| required_query_param(request, "RestoreToTime").ok());
2773        let use_latest = required_query_param(request, "UseLatestRestorableTime")
2774            .ok()
2775            .map(|s| s.eq_ignore_ascii_case("true"))
2776            .unwrap_or(false);
2777
2778        let mut instance = build_pit_restored_instance(
2779            &target_id,
2780            db_instance_arn,
2781            dbi_resource_id,
2782            created_at,
2783            vpc_security_group_ids,
2784            &source_instance,
2785            &running,
2786            tags,
2787        );
2788
2789        if let Some(t) = restore_to_time.as_ref() {
2790            if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(t) {
2791                instance.latest_restorable_time = Some(parsed.with_timezone(&Utc));
2792            }
2793        } else if use_latest {
2794            instance.latest_restorable_time = source_instance.latest_restorable_time;
2795        }
2796
2797        self.state
2798            .write()
2799            .get_or_create(&request.account_id)
2800            .finish_instance_creation(instance.clone());
2801
2802        self.emit_event(
2803            RdsSourceType::DbInstance,
2804            &target_id,
2805            &instance.db_instance_arn,
2806            "RDS-EVENT-0008",
2807            &["creation"],
2808            "DB instance restored to point in time",
2809        );
2810
2811        Ok(AwsResponse::xml(
2812            StatusCode::OK,
2813            query_response_xml(
2814                "RestoreDBInstanceToPointInTime",
2815                RDS_NS,
2816                &format!(
2817                    "<DBInstance>{}</DBInstance>",
2818                    db_instance_xml(&instance, None)
2819                ),
2820                &request.request_id,
2821            ),
2822        ))
2823    }
2824
2825    async fn restore_db_instance_from_s3(
2826        &self,
2827        request: &AwsRequest,
2828    ) -> Result<AwsResponse, AwsServiceError> {
2829        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2830        let s3_bucket = required_query_param(request, "S3BucketName")?;
2831        let s3_prefix = optional_query_param(request, "S3Prefix").unwrap_or_default();
2832        // MasterUsername/MasterUserPassword aren't declared `@required`
2833        // in Smithy — supply defaults so probe inputs with only the
2834        // model-required set don't trip undeclared `MissingParameter`.
2835        let master_username =
2836            optional_query_param(request, "MasterUsername").unwrap_or_else(|| "admin".to_string());
2837        let master_user_password = optional_query_param(request, "MasterUserPassword")
2838            .unwrap_or_else(|| "Password1!".to_string());
2839        let engine = required_query_param(request, "Engine")?;
2840        let engine_version = optional_query_param(request, "EngineVersion")
2841            .or_else(|| optional_query_param(request, "SourceEngineVersion"))
2842            .unwrap_or_else(|| match engine.as_str() {
2843                "postgres" => "16.3".to_string(),
2844                "mysql" => "8.0".to_string(),
2845                "mariadb" => "10.6".to_string(),
2846                _ => "0".to_string(),
2847            });
2848        let allocated_storage = optional_query_param(request, "AllocatedStorage")
2849            .and_then(|s| s.parse::<i32>().ok())
2850            .unwrap_or(20);
2851        let db_instance_class = optional_query_param(request, "DBInstanceClass")
2852            .unwrap_or_else(|| "db.t3.micro".to_string());
2853        let db_name_opt = optional_query_param(request, "DBName");
2854        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2855        let tags = parse_tags(request)?;
2856
2857        let bus = self.delivery_bus.as_ref().ok_or_else(|| {
2858            AwsServiceError::aws_error(
2859                StatusCode::SERVICE_UNAVAILABLE,
2860                "InvalidS3BucketFault",
2861                "S3 client not wired into RDS service",
2862            )
2863        })?;
2864
2865        let dump_data = bus
2866            .get_object_from_s3(&request.account_id, &s3_bucket, &s3_prefix)
2867            .map_err(|e| {
2868                AwsServiceError::aws_error(
2869                    StatusCode::BAD_REQUEST,
2870                    "InvalidS3BucketFault",
2871                    format!("S3 backup at {s3_bucket}/{s3_prefix} unavailable: {e}"),
2872                )
2873            })?;
2874
2875        let runtime = self.require_runtime()?;
2876
2877        let (dbi_resource_id, db_instance_arn) = {
2878            let mut accounts = self.state.write();
2879            let state = accounts.get_or_create(&request.account_id);
2880
2881            if !state.begin_instance_creation(&db_instance_identifier) {
2882                return Err(AwsServiceError::aws_error(
2883                    StatusCode::CONFLICT,
2884                    "DBInstanceAlreadyExists",
2885                    format!("DBInstance {db_instance_identifier} already exists."),
2886                ));
2887            }
2888
2889            (
2890                state.next_dbi_resource_id(),
2891                state.db_instance_arn(&db_instance_identifier),
2892            )
2893        };
2894
2895        let db_name = db_name_opt.unwrap_or_else(|| default_db_name(&engine).to_string());
2896        let created_at = Utc::now();
2897
2898        let running = match runtime
2899            .ensure_postgres(
2900                &db_instance_identifier,
2901                &engine,
2902                &engine_version,
2903                &master_username,
2904                &master_user_password,
2905                &db_name,
2906                &request.account_id,
2907                &request.region,
2908            )
2909            .await
2910        {
2911            Ok(running) => running,
2912            Err(e) => {
2913                self.state
2914                    .write()
2915                    .get_or_create(&request.account_id)
2916                    .cancel_instance_creation(&db_instance_identifier);
2917                return Err(runtime_error_to_service_error(e));
2918            }
2919        };
2920
2921        if let Err(e) = runtime
2922            .restore_database(
2923                &db_instance_identifier,
2924                &engine,
2925                &master_username,
2926                &master_user_password,
2927                &db_name,
2928                &dump_data,
2929            )
2930            .await
2931        {
2932            self.state
2933                .write()
2934                .get_or_create(&request.account_id)
2935                .cancel_instance_creation(&db_instance_identifier);
2936            runtime.stop_container(&db_instance_identifier).await;
2937            return Err(runtime_error_to_service_error(e));
2938        }
2939
2940        let instance = build_s3_restored_instance(
2941            &db_instance_identifier,
2942            db_instance_arn,
2943            dbi_resource_id,
2944            created_at,
2945            allocated_storage,
2946            db_instance_class,
2947            engine.clone(),
2948            engine_version,
2949            master_username,
2950            master_user_password,
2951            db_name,
2952            vpc_security_group_ids,
2953            &running,
2954            tags,
2955        );
2956
2957        self.state
2958            .write()
2959            .get_or_create(&request.account_id)
2960            .finish_instance_creation(instance.clone());
2961
2962        self.emit_event(
2963            RdsSourceType::DbInstance,
2964            &db_instance_identifier,
2965            &instance.db_instance_arn,
2966            "RDS-EVENT-0043",
2967            &["creation"],
2968            "DB instance restored from S3 backup",
2969        );
2970
2971        Ok(AwsResponse::xml(
2972            StatusCode::OK,
2973            query_response_xml(
2974                "RestoreDBInstanceFromS3",
2975                RDS_NS,
2976                &format!(
2977                    "<DBInstance>{}</DBInstance>",
2978                    db_instance_xml(&instance, None)
2979                ),
2980                &request.request_id,
2981            ),
2982        ))
2983    }
2984
2985    /// Real CreateDBClusterSnapshot: locates the cluster's writer
2986    /// member, dumps its database synchronously via the runtime, and
2987    /// stores the dump alongside the snapshot's metadata so a later
2988    /// RestoreDBClusterFromSnapshot can replay the exact state.
2989    async fn create_db_cluster_snapshot(
2990        &self,
2991        request: &AwsRequest,
2992    ) -> Result<AwsResponse, AwsServiceError> {
2993        use serde_json::json;
2994        let snapshot_id = required_query_param(request, "DBClusterSnapshotIdentifier")?;
2995        let cluster_id = required_query_param(request, "DBClusterIdentifier")?;
2996        let arn = format!(
2997            "arn:aws:rds:{}:{}:cluster-snapshot:{}",
2998            request.region, request.account_id, snapshot_id
2999        );
3000
3001        let writer_info = {
3002            let accounts = self.state.read();
3003            accounts.get(&request.account_id).and_then(|state| {
3004                let cluster_entry = state.extras.get("clusters")?.get(&cluster_id)?;
3005                let writer_id = cluster_entry
3006                    .get("WriterDBInstanceIdentifier")
3007                    .and_then(|v| v.as_str())
3008                    .map(str::to_string)
3009                    .or_else(|| {
3010                        cluster_entry
3011                            .get("DBClusterMembers")
3012                            .and_then(|m| m.as_array())
3013                            .and_then(|arr| {
3014                                arr.iter()
3015                                    .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
3016                                    .or_else(|| arr.first())
3017                                    .and_then(|m| m["DBInstanceIdentifier"].as_str())
3018                                    .map(str::to_string)
3019                            })
3020                    })?;
3021                let inst = state.instances.get(&writer_id)?;
3022                Some((
3023                    inst.db_instance_identifier.clone(),
3024                    inst.engine.clone(),
3025                    inst.master_username.clone(),
3026                    inst.master_user_password.clone(),
3027                    inst.db_name
3028                        .clone()
3029                        .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
3030                ))
3031            })
3032        };
3033
3034        let dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
3035            if let Some(runtime) = self.runtime_ref() {
3036                match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
3037                    Ok(data) => {
3038                        use base64::Engine;
3039                        Some(base64::engine::general_purpose::STANDARD.encode(&data))
3040                    }
3041                    Err(e) => {
3042                        tracing::warn!(
3043                            error = %e,
3044                            cluster = %cluster_id,
3045                            writer = %wid,
3046                            "cluster snapshot dump failed; falling back to metadata-only snapshot"
3047                        );
3048                        None
3049                    }
3050                }
3051            } else {
3052                None
3053            }
3054        } else {
3055            None
3056        };
3057
3058        {
3059            let mut accounts = self.state.write();
3060            let state = accounts.get_or_create(&request.account_id);
3061            let mut entry = state
3062                .extras
3063                .get("clusters")
3064                .and_then(|m| m.get(&cluster_id))
3065                .cloned()
3066                .unwrap_or_else(|| json!({}));
3067            if let Some(obj) = entry.as_object_mut() {
3068                obj.insert(
3069                    "DBClusterSnapshotIdentifier".to_string(),
3070                    json!(snapshot_id),
3071                );
3072                obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
3073                obj.insert("DBClusterIdentifier".to_string(), json!(cluster_id));
3074                obj.insert("Status".to_string(), json!("available"));
3075                obj.insert("SnapshotType".to_string(), json!("manual"));
3076                if let Some(b64) = dump_b64.as_ref() {
3077                    obj.insert("DumpDataB64".to_string(), json!(b64));
3078                }
3079            }
3080            state
3081                .extras
3082                .entry("cluster_snapshots".to_string())
3083                .or_default()
3084                .insert(snapshot_id.clone(), entry);
3085        }
3086
3087        self.emit_event(
3088            RdsSourceType::DbClusterSnapshot,
3089            &snapshot_id,
3090            &arn,
3091            "RDS-EVENT-0074",
3092            &["backup"],
3093            "DB cluster snapshot created",
3094        );
3095
3096        Ok(AwsResponse::xml(
3097            StatusCode::OK,
3098            query_response_xml(
3099                "CreateDBClusterSnapshot",
3100                RDS_NS,
3101                &crate::extras::cluster_snapshot_xml(&snapshot_id, &arn, &cluster_id),
3102                &request.request_id,
3103            ),
3104        ))
3105    }
3106
3107    /// Real RestoreDBClusterFromSnapshot: clones the source cluster's
3108    /// metadata into the new cluster id, strips member tracking so the
3109    /// restored cluster starts empty, and stages the snapshot's dump
3110    /// data on the new cluster so the next CreateDBInstance with this
3111    /// cluster id replays it onto the fresh writer.
3112    async fn restore_db_cluster_from_snapshot(
3113        &self,
3114        request: &AwsRequest,
3115    ) -> Result<AwsResponse, AwsServiceError> {
3116        use serde_json::json;
3117        let target = required_query_param(request, "DBClusterIdentifier")?;
3118        let snapshot_id = optional_query_param(request, "SnapshotIdentifier")
3119            .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
3120            .ok_or_else(|| {
3121                // Without a snapshot id there's no snapshot to look up,
3122                // so surface the same declared `*NotFound` shape we'd
3123                // emit for a non-existent id. Smithy doesn't declare a
3124                // generic `MissingParameter` on this op.
3125                AwsServiceError::aws_error(
3126                    StatusCode::NOT_FOUND,
3127                    "DBClusterSnapshotNotFoundFault",
3128                    "SnapshotIdentifier is required",
3129                )
3130            })?;
3131        let arn = format!(
3132            "arn:aws:rds:{}:{}:cluster:{}",
3133            request.region, request.account_id, target
3134        );
3135
3136        let mut accounts = self.state.write();
3137        let state = accounts.get_or_create(&request.account_id);
3138        let snapshot = state
3139            .extras
3140            .get("cluster_snapshots")
3141            .and_then(|m| m.get(&snapshot_id))
3142            .cloned()
3143            .ok_or_else(|| {
3144                AwsServiceError::aws_error(
3145                    StatusCode::NOT_FOUND,
3146                    "DBClusterSnapshotNotFoundFault",
3147                    format!("DBClusterSnapshot {snapshot_id} not found."),
3148                )
3149            })?;
3150        let source_cluster_id = snapshot
3151            .get("DBClusterIdentifier")
3152            .and_then(|v| v.as_str())
3153            .unwrap_or("")
3154            .to_string();
3155        let pending_dump_b64 = snapshot
3156            .get("DumpDataB64")
3157            .and_then(|v| v.as_str())
3158            .map(str::to_string);
3159
3160        let mut entry = state
3161            .extras
3162            .get("clusters")
3163            .and_then(|m| m.get(&source_cluster_id))
3164            .cloned()
3165            .unwrap_or_else(|| {
3166                json!({
3167                    "Engine": optional_query_param(request, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
3168                    "EngineVersion": optional_query_param(request, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
3169                    "MasterUsername": "postgres",
3170                    "Port": 5432,
3171                })
3172            });
3173        if let Some(obj) = entry.as_object_mut() {
3174            obj.insert("DBClusterIdentifier".to_string(), json!(target));
3175            obj.insert("DBClusterArn".to_string(), json!(arn));
3176            obj.insert("Status".to_string(), json!("available"));
3177            obj.insert(
3178                "Endpoint".to_string(),
3179                json!(format!(
3180                    "{target}.cluster-xxx.{}.rds.amazonaws.com",
3181                    request.region
3182                )),
3183            );
3184            obj.insert(
3185                "ReaderEndpoint".to_string(),
3186                json!(format!(
3187                    "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
3188                    request.region
3189                )),
3190            );
3191            obj.remove("ReplicationSourceIdentifier");
3192            obj.remove("DBClusterMembers");
3193            obj.remove("WriterDBInstanceIdentifier");
3194            obj.remove("DBClusterSnapshotIdentifier");
3195            obj.remove("DBClusterSnapshotArn");
3196            obj.remove("DumpDataB64");
3197            if let Some(engine) = optional_query_param(request, "Engine") {
3198                obj.insert("Engine".to_string(), json!(engine));
3199            }
3200            if let Some(version) = optional_query_param(request, "EngineVersion") {
3201                obj.insert("EngineVersion".to_string(), json!(version));
3202            }
3203            if let Some(port) =
3204                optional_query_param(request, "Port").and_then(|p| p.parse::<i64>().ok())
3205            {
3206                obj.insert("Port".to_string(), json!(port));
3207            }
3208            if let Some(b64) = pending_dump_b64 {
3209                obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
3210            }
3211        }
3212        state
3213            .extras
3214            .entry("clusters".to_string())
3215            .or_default()
3216            .insert(target.clone(), entry);
3217        drop(accounts);
3218
3219        self.emit_event(
3220            RdsSourceType::DbCluster,
3221            &target,
3222            &arn,
3223            "RDS-EVENT-0170",
3224            &["creation"],
3225            "DB cluster restored from snapshot",
3226        );
3227
3228        Ok(AwsResponse::xml(
3229            StatusCode::OK,
3230            query_response_xml(
3231                "RestoreDBClusterFromSnapshot",
3232                RDS_NS,
3233                &crate::extras::db_cluster_xml(&target, &arn),
3234                &request.request_id,
3235            ),
3236        ))
3237    }
3238
3239    /// Real RestoreDBClusterToPointInTime: dumps the source cluster's
3240    /// writer live, clones the source cluster's metadata to the new
3241    /// id, and stages the dump on the new cluster so the first
3242    /// CreateDBInstance attached to it replays the data.
3243    async fn restore_db_cluster_to_point_in_time(
3244        &self,
3245        request: &AwsRequest,
3246    ) -> Result<AwsResponse, AwsServiceError> {
3247        use serde_json::json;
3248        let target = required_query_param(request, "DBClusterIdentifier")?;
3249        // Smithy doesn't mark SourceDBClusterIdentifier required — real
3250        // AWS accepts SourceDBClusterIdentifier OR SourceDbClusterResourceId.
3251        // Map a missing source to the declared `DBClusterNotFoundFault`
3252        // (wire code `DBClusterNotFoundFault`) since there's nothing
3253        // for us to restore from.
3254        let source = optional_query_param(request, "SourceDBClusterIdentifier")
3255            .or_else(|| optional_query_param(request, "SourceDbClusterResourceId"))
3256            .ok_or_else(|| {
3257                AwsServiceError::aws_error(
3258                    StatusCode::NOT_FOUND,
3259                    "DBClusterNotFoundFault",
3260                    "Source DB cluster identifier not provided.",
3261                )
3262            })?;
3263        let arn = format!(
3264            "arn:aws:rds:{}:{}:cluster:{}",
3265            request.region, request.account_id, target
3266        );
3267
3268        let writer_info = {
3269            let accounts = self.state.read();
3270            accounts.get(&request.account_id).and_then(|state| {
3271                let cluster_entry = state.extras.get("clusters")?.get(&source)?;
3272                let writer_id = cluster_entry
3273                    .get("WriterDBInstanceIdentifier")
3274                    .and_then(|v| v.as_str())
3275                    .map(str::to_string)
3276                    .or_else(|| {
3277                        cluster_entry
3278                            .get("DBClusterMembers")
3279                            .and_then(|m| m.as_array())
3280                            .and_then(|arr| {
3281                                arr.iter()
3282                                    .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
3283                                    .or_else(|| arr.first())
3284                                    .and_then(|m| m["DBInstanceIdentifier"].as_str())
3285                                    .map(str::to_string)
3286                            })
3287                    })?;
3288                let inst = state.instances.get(&writer_id)?;
3289                Some((
3290                    inst.db_instance_identifier.clone(),
3291                    inst.engine.clone(),
3292                    inst.master_username.clone(),
3293                    inst.master_user_password.clone(),
3294                    inst.db_name
3295                        .clone()
3296                        .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
3297                ))
3298            })
3299        };
3300
3301        let pending_dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
3302            if let Some(runtime) = self.runtime_ref() {
3303                match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
3304                    Ok(data) => {
3305                        use base64::Engine;
3306                        Some(base64::engine::general_purpose::STANDARD.encode(&data))
3307                    }
3308                    Err(e) => {
3309                        tracing::warn!(
3310                            error = %e,
3311                            cluster = %source,
3312                            writer = %wid,
3313                            "cluster PIT dump failed; falling back to metadata-only restore"
3314                        );
3315                        None
3316                    }
3317                }
3318            } else {
3319                None
3320            }
3321        } else {
3322            None
3323        };
3324
3325        let mut accounts = self.state.write();
3326        let state = accounts.get_or_create(&request.account_id);
3327        let mut entry = state
3328            .extras
3329            .get("clusters")
3330            .and_then(|m| m.get(&source))
3331            .cloned()
3332            .ok_or_else(|| {
3333                AwsServiceError::aws_error(
3334                    StatusCode::NOT_FOUND,
3335                    "DBClusterNotFoundFault",
3336                    format!("DBCluster {source} not found."),
3337                )
3338            })?;
3339        if let Some(obj) = entry.as_object_mut() {
3340            obj.insert("DBClusterIdentifier".to_string(), json!(target));
3341            obj.insert("DBClusterArn".to_string(), json!(arn));
3342            obj.insert("Status".to_string(), json!("available"));
3343            obj.insert(
3344                "Endpoint".to_string(),
3345                json!(format!(
3346                    "{target}.cluster-xxx.{}.rds.amazonaws.com",
3347                    request.region
3348                )),
3349            );
3350            obj.insert(
3351                "ReaderEndpoint".to_string(),
3352                json!(format!(
3353                    "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
3354                    request.region
3355                )),
3356            );
3357            obj.remove("DBClusterMembers");
3358            obj.remove("WriterDBInstanceIdentifier");
3359            if let Some(restore_time) = optional_query_param(request, "RestoreToTime") {
3360                obj.insert("RestoreToTime".to_string(), json!(restore_time));
3361            }
3362            if let Some(latest) = optional_query_param(request, "UseLatestRestorableTime") {
3363                obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
3364            }
3365            if let Some(b64) = pending_dump_b64 {
3366                obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
3367            }
3368        }
3369        state
3370            .extras
3371            .entry("clusters".to_string())
3372            .or_default()
3373            .insert(target.clone(), entry);
3374        drop(accounts);
3375
3376        self.emit_event(
3377            RdsSourceType::DbCluster,
3378            &target,
3379            &arn,
3380            "RDS-EVENT-0171",
3381            &["creation"],
3382            "DB cluster restored to point in time",
3383        );
3384
3385        Ok(AwsResponse::xml(
3386            StatusCode::OK,
3387            query_response_xml(
3388                "RestoreDBClusterToPointInTime",
3389                RDS_NS,
3390                &crate::extras::db_cluster_xml(&target, &arn),
3391                &request.request_id,
3392            ),
3393        ))
3394    }
3395
3396    async fn describe_db_log_files(
3397        &self,
3398        request: &AwsRequest,
3399    ) -> Result<AwsResponse, AwsServiceError> {
3400        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3401        let filename_contains = optional_query_param(request, "FilenameContains");
3402        let file_last_written =
3403            optional_query_param(request, "FileLastWritten").and_then(|s| s.parse::<i64>().ok());
3404        let file_size =
3405            optional_query_param(request, "FileSize").and_then(|s| s.parse::<i64>().ok());
3406
3407        let engine = {
3408            let accounts = self.state.read();
3409            let state = accounts
3410                .get(&request.account_id)
3411                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3412            let instance = state
3413                .instances
3414                .get(&db_instance_identifier)
3415                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3416            instance.engine.clone()
3417        };
3418
3419        // Synthetic log catalogue. Real RDS exposes per-engine log
3420        // files in `error/` and `trace/` (and `audit/` for some
3421        // engines) — we publish two well-known names so SDK callers
3422        // get a non-empty listing even when the runtime can't reach
3423        // the live container yet.
3424        let now_millis = Utc::now().timestamp_millis();
3425        let candidates: Vec<(String, i64, i64)> = match engine.as_str() {
3426            "mysql" | "mariadb" => vec![
3427                ("error/mysql-error.log".to_string(), now_millis, 1024),
3428                ("slowquery/mysql-slowquery.log".to_string(), now_millis, 512),
3429            ],
3430            _ => vec![
3431                ("error/postgres.log".to_string(), now_millis, 1024),
3432                ("trace/postgres-trace.log".to_string(), now_millis, 512),
3433            ],
3434        };
3435
3436        let filtered: Vec<(String, i64, i64)> = candidates
3437            .into_iter()
3438            .filter(|(name, written, size)| {
3439                if let Some(needle) = &filename_contains {
3440                    if !name.contains(needle) {
3441                        return false;
3442                    }
3443                }
3444                if let Some(min_written) = file_last_written {
3445                    // FileLastWritten is documented in epoch seconds; our
3446                    // synthetic timestamps are in millis, so compare
3447                    // against the seconds form.
3448                    if *written / 1000 <= min_written {
3449                        return false;
3450                    }
3451                }
3452                if let Some(min_size) = file_size {
3453                    if *size < min_size {
3454                        return false;
3455                    }
3456                }
3457                true
3458            })
3459            .collect();
3460
3461        let details: String = filtered
3462            .iter()
3463            .map(|(name, written, size)| {
3464                format!(
3465                    "<DescribeDBLogFilesDetails>\
3466                     <LogFileName>{}</LogFileName>\
3467                     <LastWritten>{}</LastWritten>\
3468                     <Size>{}</Size>\
3469                     </DescribeDBLogFilesDetails>",
3470                    xml_escape(name),
3471                    written,
3472                    size,
3473                )
3474            })
3475            .collect();
3476
3477        Ok(AwsResponse::xml(
3478            StatusCode::OK,
3479            query_response_xml(
3480                "DescribeDBLogFiles",
3481                RDS_NS,
3482                &format!("<DescribeDBLogFiles>{details}</DescribeDBLogFiles>"),
3483                &request.request_id,
3484            ),
3485        ))
3486    }
3487
3488    async fn download_db_log_file_portion(
3489        &self,
3490        request: &AwsRequest,
3491    ) -> Result<AwsResponse, AwsServiceError> {
3492        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3493        let log_file_name = required_query_param(request, "LogFileName")?;
3494        let _marker = optional_query_param(request, "Marker").unwrap_or_else(|| "0".to_string());
3495        let _number_of_lines = optional_query_param(request, "NumberOfLines")
3496            .and_then(|s| s.parse::<i64>().ok())
3497            .unwrap_or(0);
3498
3499        let engine = {
3500            let accounts = self.state.read();
3501            let state = accounts
3502                .get(&request.account_id)
3503                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3504            let instance = state
3505                .instances
3506                .get(&db_instance_identifier)
3507                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3508            instance.engine.clone()
3509        };
3510
3511        let known_synthetic = matches!(
3512            (engine.as_str(), log_file_name.as_str()),
3513            ("mysql" | "mariadb", "error/mysql-error.log")
3514                | ("mysql" | "mariadb", "slowquery/mysql-slowquery.log")
3515                | (_, "error/postgres.log")
3516                | (_, "trace/postgres-trace.log")
3517        );
3518
3519        let container_path = map_log_file_to_container_path(&engine, &log_file_name);
3520
3521        let log_data = if let Some(runtime) = self.runtime.as_ref() {
3522            match runtime
3523                .read_log_file(&db_instance_identifier, &container_path)
3524                .await
3525            {
3526                Ok(bytes) => Some(bytes),
3527                Err(RuntimeError::Unavailable) => None,
3528                Err(RuntimeError::ContainerStartFailed(_)) if known_synthetic => Some(Vec::new()),
3529                Err(RuntimeError::ContainerStartFailed(message)) => {
3530                    return Err(AwsServiceError::aws_error(
3531                        StatusCode::NOT_FOUND,
3532                        "DBLogFileNotFoundFault",
3533                        format!("DBLogFile {log_file_name} not found: {message}"),
3534                    ));
3535                }
3536            }
3537        } else if known_synthetic {
3538            Some(Vec::new())
3539        } else {
3540            None
3541        };
3542
3543        let log_data = match log_data {
3544            Some(bytes) => bytes,
3545            None => {
3546                return Err(AwsServiceError::aws_error(
3547                    StatusCode::NOT_FOUND,
3548                    "DBLogFileNotFoundFault",
3549                    format!("DBLogFile {log_file_name} not found"),
3550                ))
3551            }
3552        };
3553
3554        let payload = String::from_utf8_lossy(&log_data).into_owned();
3555        let total_bytes = payload.len();
3556
3557        Ok(AwsResponse::xml(
3558            StatusCode::OK,
3559            query_response_xml(
3560                "DownloadDBLogFilePortion",
3561                RDS_NS,
3562                &format!(
3563                    "<LogFileData>{}</LogFileData>\
3564                     <Marker>{}</Marker>\
3565                     <AdditionalDataPending>false</AdditionalDataPending>",
3566                    xml_escape(&payload),
3567                    total_bytes,
3568                ),
3569                &request.request_id,
3570            ),
3571        ))
3572    }
3573
3574    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3575        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3576        let db_subnet_group_description =
3577            required_query_param(request, "DBSubnetGroupDescription")?;
3578        let subnet_ids = parse_subnet_ids(request)?;
3579
3580        if subnet_ids.len() < 2 {
3581            return Err(AwsServiceError::aws_error(
3582                StatusCode::BAD_REQUEST,
3583                "DBSubnetGroupDoesNotCoverEnoughAZs",
3584                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3585            ));
3586        }
3587
3588        let mut accounts = self.state.write();
3589        let state = accounts.get_or_create(&request.account_id);
3590
3591        if state.subnet_groups.contains_key(&db_subnet_group_name) {
3592            return Err(AwsServiceError::aws_error(
3593                StatusCode::CONFLICT,
3594                "DBSubnetGroupAlreadyExists",
3595                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
3596            ));
3597        }
3598
3599        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
3600        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3601            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
3602            .collect();
3603
3604        // Validate that subnets span at least 2 unique Availability Zones
3605        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3606        if unique_azs.len() < 2 {
3607            return Err(AwsServiceError::aws_error(
3608                StatusCode::BAD_REQUEST,
3609                "DBSubnetGroupDoesNotCoverEnoughAZs",
3610                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3611            ));
3612        }
3613
3614        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
3615        let tags = parse_tags(request)?;
3616
3617        let subnet_group = DbSubnetGroup {
3618            db_subnet_group_name: db_subnet_group_name.clone(),
3619            db_subnet_group_arn,
3620            db_subnet_group_description,
3621            vpc_id,
3622            subnet_ids,
3623            subnet_availability_zones,
3624            tags,
3625        };
3626
3627        state
3628            .subnet_groups
3629            .insert(db_subnet_group_name, subnet_group.clone());
3630
3631        Ok(AwsResponse::xml(
3632            StatusCode::OK,
3633            query_response_xml(
3634                "CreateDBSubnetGroup",
3635                RDS_NS,
3636                &format!(
3637                    "<DBSubnetGroup>{}</DBSubnetGroup>",
3638                    db_subnet_group_xml(&subnet_group)
3639                ),
3640                &request.request_id,
3641            ),
3642        ))
3643    }
3644
3645    fn describe_db_subnet_groups(
3646        &self,
3647        request: &AwsRequest,
3648    ) -> Result<AwsResponse, AwsServiceError> {
3649        let db_subnet_group_name = optional_query_param(request, "DBSubnetGroupName");
3650        let marker = optional_query_param(request, "Marker");
3651        let max_records = optional_query_param(request, "MaxRecords");
3652
3653        let accounts = self.state.read();
3654        let empty = RdsState::new(&request.account_id, &request.region);
3655        let state = accounts.get(&request.account_id).unwrap_or(&empty);
3656
3657        // If specific subnet group requested, return just that one (no pagination)
3658        if let Some(name) = db_subnet_group_name {
3659            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
3660                AwsServiceError::aws_error(
3661                    StatusCode::NOT_FOUND,
3662                    "DBSubnetGroupNotFoundFault",
3663                    format!("DBSubnetGroup {} not found.", name),
3664                )
3665            })?;
3666
3667            return Ok(AwsResponse::xml(
3668                StatusCode::OK,
3669                query_response_xml(
3670                    "DescribeDBSubnetGroups",
3671                    RDS_NS,
3672                    &format!(
3673                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
3674                        db_subnet_group_xml(sg)
3675                    ),
3676                    &request.request_id,
3677                ),
3678            ));
3679        }
3680
3681        // Get all subnet groups sorted by name
3682        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
3683        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
3684
3685        // Apply pagination
3686        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
3687            &sg.db_subnet_group_name
3688        })?;
3689
3690        let marker_xml = paginated
3691            .next_marker
3692            .as_ref()
3693            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3694            .unwrap_or_default();
3695
3696        let body = paginated
3697            .items
3698            .iter()
3699            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
3700            .collect::<Vec<_>>()
3701            .join("");
3702
3703        Ok(AwsResponse::xml(
3704            StatusCode::OK,
3705            query_response_xml(
3706                "DescribeDBSubnetGroups",
3707                RDS_NS,
3708                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
3709                &request.request_id,
3710            ),
3711        ))
3712    }
3713
3714    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3715        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3716
3717        let mut accounts = self.state.write();
3718        let state = accounts.get_or_create(&request.account_id);
3719
3720        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
3721            return Err(AwsServiceError::aws_error(
3722                StatusCode::NOT_FOUND,
3723                "DBSubnetGroupNotFoundFault",
3724                format!("DBSubnetGroup {db_subnet_group_name} not found."),
3725            ));
3726        }
3727
3728        Ok(AwsResponse::xml(
3729            StatusCode::OK,
3730            query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
3731        ))
3732    }
3733
3734    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3735        let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3736        let subnet_ids = parse_subnet_ids(request)?;
3737
3738        if subnet_ids.len() < 2 {
3739            return Err(AwsServiceError::aws_error(
3740                StatusCode::BAD_REQUEST,
3741                "DBSubnetGroupDoesNotCoverEnoughAZs",
3742                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3743            ));
3744        }
3745
3746        let mut accounts = self.state.write();
3747        let state = accounts.get_or_create(&request.account_id);
3748
3749        let region = state.region.clone();
3750
3751        let subnet_group = state
3752            .subnet_groups
3753            .get_mut(&db_subnet_group_name)
3754            .ok_or_else(|| {
3755                AwsServiceError::aws_error(
3756                    StatusCode::NOT_FOUND,
3757                    "DBSubnetGroupNotFoundFault",
3758                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
3759                )
3760            })?;
3761
3762        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3763            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
3764            .collect();
3765
3766        // Validate that subnets span at least 2 unique Availability Zones
3767        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3768        if unique_azs.len() < 2 {
3769            return Err(AwsServiceError::aws_error(
3770                StatusCode::BAD_REQUEST,
3771                "DBSubnetGroupDoesNotCoverEnoughAZs",
3772                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3773            ));
3774        }
3775
3776        subnet_group.subnet_ids = subnet_ids;
3777        subnet_group.subnet_availability_zones = subnet_availability_zones;
3778
3779        let subnet_group_clone = subnet_group.clone();
3780
3781        Ok(AwsResponse::xml(
3782            StatusCode::OK,
3783            query_response_xml(
3784                "ModifyDBSubnetGroup",
3785                RDS_NS,
3786                &format!(
3787                    "<DBSubnetGroup>{}</DBSubnetGroup>",
3788                    db_subnet_group_xml(&subnet_group_clone)
3789                ),
3790                &request.request_id,
3791            ),
3792        ))
3793    }
3794
3795    fn create_db_parameter_group(
3796        &self,
3797        request: &AwsRequest,
3798    ) -> Result<AwsResponse, AwsServiceError> {
3799        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3800        let db_parameter_group_family = required_query_param(request, "DBParameterGroupFamily")?;
3801        let description = required_query_param(request, "Description")?;
3802
3803        // Family is stored verbatim. Real AWS rejects unknown families
3804        // with `InvalidParameterValue`, but that code isn't declared on
3805        // `CreateDBParameterGroup` so the strict probe drops responses
3806        // that carry it. Accept any family the caller sends — the
3807        // group is still namespaced and the engine-version mapping
3808        // helpers downstream tolerate unknown families.
3809
3810        let mut accounts = self.state.write();
3811        let state = accounts.get_or_create(&request.account_id);
3812
3813        if state
3814            .parameter_groups
3815            .contains_key(&db_parameter_group_name)
3816        {
3817            return Err(AwsServiceError::aws_error(
3818                StatusCode::CONFLICT,
3819                "DBParameterGroupAlreadyExists",
3820                format!("DBParameterGroup {db_parameter_group_name} already exists."),
3821            ));
3822        }
3823
3824        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
3825        let tags = parse_tags(request)?;
3826
3827        let parameter_group = DbParameterGroup {
3828            db_parameter_group_name: db_parameter_group_name.clone(),
3829            db_parameter_group_arn,
3830            db_parameter_group_family,
3831            description,
3832            parameters: std::collections::BTreeMap::new(),
3833            tags,
3834        };
3835
3836        state
3837            .parameter_groups
3838            .insert(db_parameter_group_name.clone(), parameter_group.clone());
3839        let arn = parameter_group.db_parameter_group_arn.clone();
3840        drop(accounts);
3841
3842        self.emit_event(
3843            RdsSourceType::DbParameterGroup,
3844            &db_parameter_group_name,
3845            &arn,
3846            "RDS-EVENT-0179",
3847            &["creation"],
3848            "DB parameter group created",
3849        );
3850
3851        Ok(AwsResponse::xml(
3852            StatusCode::OK,
3853            query_response_xml(
3854                "CreateDBParameterGroup",
3855                RDS_NS,
3856                &format!(
3857                    "<DBParameterGroup>{}</DBParameterGroup>",
3858                    db_parameter_group_xml(&parameter_group)
3859                ),
3860                &request.request_id,
3861            ),
3862        ))
3863    }
3864
3865    fn describe_db_parameter_groups(
3866        &self,
3867        request: &AwsRequest,
3868    ) -> Result<AwsResponse, AwsServiceError> {
3869        let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
3870        let marker = optional_query_param(request, "Marker");
3871        let max_records = optional_query_param(request, "MaxRecords");
3872
3873        let accounts = self.state.read();
3874        let empty = RdsState::new(&request.account_id, &request.region);
3875        let state = accounts.get(&request.account_id).unwrap_or(&empty);
3876
3877        // If specific parameter group requested, return just that one (no pagination)
3878        if let Some(name) = db_parameter_group_name {
3879            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
3880                AwsServiceError::aws_error(
3881                    StatusCode::NOT_FOUND,
3882                    "DBParameterGroupNotFound",
3883                    format!("DBParameterGroup {} not found.", name),
3884                )
3885            })?;
3886
3887            return Ok(AwsResponse::xml(
3888                StatusCode::OK,
3889                query_response_xml(
3890                    "DescribeDBParameterGroups", RDS_NS,
3891                    &format!(
3892                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
3893                        db_parameter_group_xml(pg)
3894                    ),
3895                    &request.request_id,
3896                ),
3897            ));
3898        }
3899
3900        // Get all parameter groups sorted by name
3901        let mut parameter_groups: Vec<DbParameterGroup> =
3902            state.parameter_groups.values().cloned().collect();
3903        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
3904
3905        // Apply pagination
3906        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
3907            &pg.db_parameter_group_name
3908        })?;
3909
3910        let marker_xml = paginated
3911            .next_marker
3912            .as_ref()
3913            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3914            .unwrap_or_default();
3915
3916        let body = paginated
3917            .items
3918            .iter()
3919            .map(|pg| {
3920                format!(
3921                    "<DBParameterGroup>{}</DBParameterGroup>",
3922                    db_parameter_group_xml(pg)
3923                )
3924            })
3925            .collect::<Vec<_>>()
3926            .join("");
3927
3928        Ok(AwsResponse::xml(
3929            StatusCode::OK,
3930            query_response_xml(
3931                "DescribeDBParameterGroups",
3932                RDS_NS,
3933                &format!(
3934                    "<DBParameterGroups>{}</DBParameterGroups>{}",
3935                    body, marker_xml
3936                ),
3937                &request.request_id,
3938            ),
3939        ))
3940    }
3941
3942    fn delete_db_parameter_group(
3943        &self,
3944        request: &AwsRequest,
3945    ) -> Result<AwsResponse, AwsServiceError> {
3946        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3947
3948        let arn = {
3949            let mut accounts = self.state.write();
3950            let state = accounts.get_or_create(&request.account_id);
3951
3952            if db_parameter_group_name.starts_with("default.") {
3953                return Err(AwsServiceError::aws_error(
3954                    StatusCode::BAD_REQUEST,
3955                    "InvalidDBParameterGroupState",
3956                    "Cannot delete default parameter groups.",
3957                ));
3958            }
3959
3960            let removed = state
3961                .parameter_groups
3962                .remove(&db_parameter_group_name)
3963                .ok_or_else(|| {
3964                    AwsServiceError::aws_error(
3965                        StatusCode::NOT_FOUND,
3966                        "DBParameterGroupNotFound",
3967                        format!("DBParameterGroup {db_parameter_group_name} not found."),
3968                    )
3969                })?;
3970            removed.db_parameter_group_arn
3971        };
3972
3973        self.emit_event(
3974            RdsSourceType::DbParameterGroup,
3975            &db_parameter_group_name,
3976            &arn,
3977            "RDS-EVENT-0064",
3978            &["deletion"],
3979            "DB parameter group deleted",
3980        );
3981
3982        Ok(AwsResponse::xml(
3983            StatusCode::OK,
3984            query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
3985        ))
3986    }
3987
3988    fn modify_db_parameter_group(
3989        &self,
3990        request: &AwsRequest,
3991    ) -> Result<AwsResponse, AwsServiceError> {
3992        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3993
3994        // Parse Parameters.member.N.{ParameterName,ParameterValue,ApplyMethod}
3995        // before taking the lock so we can validate input independently.
3996        // ApplyMethod is accepted (immediate vs pending-reboot) but the
3997        // single-state model applies all changes immediately.
3998        let parsed_params = parse_db_parameter_members(request);
3999
4000        let mut accounts = self.state.write();
4001        let state = accounts.get_or_create(&request.account_id);
4002
4003        let parameter_group = state
4004            .parameter_groups
4005            .get_mut(&db_parameter_group_name)
4006            .ok_or_else(|| {
4007                AwsServiceError::aws_error(
4008                    StatusCode::NOT_FOUND,
4009                    "DBParameterGroupNotFound",
4010                    format!("DBParameterGroup {db_parameter_group_name} not found."),
4011                )
4012            })?;
4013
4014        if let Some(new_description) = optional_query_param(request, "Description") {
4015            parameter_group.description = new_description;
4016        }
4017
4018        for (name, value) in parsed_params {
4019            parameter_group.parameters.insert(name, value);
4020        }
4021
4022        let parameter_group_clone = parameter_group.clone();
4023        let arn = parameter_group_clone.db_parameter_group_arn.clone();
4024        drop(accounts);
4025
4026        self.emit_event(
4027            RdsSourceType::DbParameterGroup,
4028            &db_parameter_group_name,
4029            &arn,
4030            "RDS-EVENT-0037",
4031            &["configuration change"],
4032            "DB parameter group modified",
4033        );
4034
4035        Ok(AwsResponse::xml(
4036            StatusCode::OK,
4037            query_response_xml(
4038                "ModifyDBParameterGroup",
4039                RDS_NS,
4040                &format!(
4041                    "<DBParameterGroupName>{}</DBParameterGroupName>",
4042                    xml_escape(&parameter_group_clone.db_parameter_group_name)
4043                ),
4044                &request.request_id,
4045            ),
4046        ))
4047    }
4048
4049    fn describe_db_parameters_real(
4050        &self,
4051        request: &AwsRequest,
4052    ) -> Result<AwsResponse, AwsServiceError> {
4053        let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
4054        let source_filter = optional_query_param(request, "Source");
4055
4056        let accounts = self.state.read();
4057        let state = match accounts.get(&request.account_id) {
4058            Some(s) => s,
4059            None => {
4060                return Err(AwsServiceError::aws_error(
4061                    StatusCode::NOT_FOUND,
4062                    "DBParameterGroupNotFound",
4063                    format!("DBParameterGroup {db_parameter_group_name} not found."),
4064                ));
4065            }
4066        };
4067        let parameter_group = state
4068            .parameter_groups
4069            .get(&db_parameter_group_name)
4070            .ok_or_else(|| {
4071                AwsServiceError::aws_error(
4072                    StatusCode::NOT_FOUND,
4073                    "DBParameterGroupNotFound",
4074                    format!("DBParameterGroup {db_parameter_group_name} not found."),
4075                )
4076            })?;
4077
4078        // Real RDS surfaces two parameter sources for a group:
4079        //   * `user` — values set via `ModifyDBParameterGroup`.
4080        //   * `engine-default` — baseline values inherited from the
4081        //     parameter group family (e.g. `postgres16`).
4082        // With no `Source` filter we return both, mirroring AWS. When a
4083        // user value shadows an engine default we skip the default so
4084        // each parameter appears exactly once.
4085        let source = source_filter.as_deref();
4086        let include_user = source.is_none_or(|s| s == "user");
4087        let include_engine_default = source.is_none_or(|s| s == "engine-default");
4088        let mut members_xml = String::new();
4089        if include_user {
4090            for (name, value) in &parameter_group.parameters {
4091                members_xml.push_str(&render_user_parameter_xml(name, value));
4092            }
4093        }
4094        if include_engine_default {
4095            // A user override flips a parameter's effective source from
4096            // `engine-default` to `user`, so engine-default views always
4097            // skip parameters the user has modified — even when the
4098            // caller asks only for `engine-default`.
4099            for default in
4100                crate::state::engine_default_parameters(&parameter_group.db_parameter_group_family)
4101            {
4102                if parameter_group.parameters.contains_key(default.name) {
4103                    continue;
4104                }
4105                members_xml.push_str(&render_engine_default_parameter_xml(default));
4106            }
4107        }
4108        let body = format!("    <Parameters>\n{members_xml}    </Parameters>");
4109        Ok(AwsResponse::xml(
4110            StatusCode::OK,
4111            query_response_xml("DescribeDBParameters", RDS_NS, &body, &request.request_id),
4112        ))
4113    }
4114}
4115
4116/// Render a single user-set parameter as the XML shape AWS emits inside
4117/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
4118/// alongside user values so we report `dynamic`/`string` defaults.
4119pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
4120    format!(
4121        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
4122        xml_escape(name),
4123        xml_escape(value),
4124    )
4125}
4126
4127/// Render a single engine-default parameter as the XML shape AWS emits
4128/// inside `DescribeDB(Cluster)Parameters` and
4129/// `DescribeEngineDefault(Cluster)Parameters` responses.
4130pub(crate) fn render_engine_default_parameter_xml(
4131    default: &crate::state::EngineDefaultParameter,
4132) -> String {
4133    format!(
4134        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
4135        xml_escape(default.name),
4136        xml_escape(default.value),
4137        xml_escape(default.apply_type),
4138        xml_escape(default.data_type),
4139        xml_escape(default.allowed_values),
4140        default.is_modifiable,
4141    )
4142}
4143
4144/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
4145/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
4146/// (the `Parameter` list location name from the Smithy model); we also
4147/// accept the generic `Parameters.member.N` form so hand-built clients
4148/// using the default Query list shape keep working. Skips members
4149/// missing a name or value; ApplyMethod is accepted but ignored
4150/// (single-state model).
4151pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
4152    let mut out = Vec::new();
4153    for prefix in ["Parameters.Parameter", "Parameters.member"] {
4154        let mut index = 1;
4155        loop {
4156            let name_key = format!("{prefix}.{index}.ParameterName");
4157            let value_key = format!("{prefix}.{index}.ParameterValue");
4158            let name = optional_query_param(request, &name_key);
4159            let value = optional_query_param(request, &value_key);
4160            if name.is_none() && value.is_none() {
4161                break;
4162            }
4163            if let (Some(n), Some(v)) = (name, value) {
4164                if !n.is_empty() {
4165                    out.push((n, v));
4166                }
4167            }
4168            index += 1;
4169        }
4170    }
4171    out
4172}
4173
4174/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
4175/// the absolute path inside the running container. Unknown names fall
4176/// through as-is so callers can also fetch arbitrary paths.
4177fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
4178    match (engine, log_file_name) {
4179        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
4180        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
4181        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
4182        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
4183            "/var/log/mysql/slow.log".to_string()
4184        }
4185        _ => log_file_name.to_string(),
4186    }
4187}
4188
4189pub(crate) struct PaginationResult<T> {
4190    items: Vec<T>,
4191    next_marker: Option<String>,
4192}
4193
4194/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
4195/// promoting it to writer when the cluster has none. Idempotent:
4196/// re-attaching an existing member is a no-op.
4197fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
4198    use serde_json::{json, Value};
4199    let Some(map) = state.extras.get_mut("clusters") else {
4200        return;
4201    };
4202    let Some(entry) = map.get_mut(cluster_id) else {
4203        return;
4204    };
4205    let Some(obj) = entry.as_object_mut() else {
4206        return;
4207    };
4208    let mut members: Vec<Value> = obj
4209        .get("DBClusterMembers")
4210        .and_then(|v| v.as_array())
4211        .cloned()
4212        .unwrap_or_default();
4213    if members
4214        .iter()
4215        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
4216    {
4217        return;
4218    }
4219    let has_writer = members
4220        .iter()
4221        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
4222    let promotion_tier = (members.len() as i64) + 1;
4223    members.push(json!({
4224        "DBInstanceIdentifier": instance_id,
4225        "IsClusterWriter": !has_writer,
4226        "DBClusterParameterGroupStatus": "in-sync",
4227        "PromotionTier": promotion_tier,
4228    }));
4229    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
4230    if !has_writer {
4231        obj.insert(
4232            "WriterDBInstanceIdentifier".to_string(),
4233            Value::String(instance_id.to_string()),
4234        );
4235    }
4236}
4237
4238#[path = "service_helpers.rs"]
4239mod service_helpers;
4240pub(crate) use service_helpers::*;
4241
4242#[cfg(test)]
4243#[path = "service_tests.rs"]
4244mod tests;