Skip to main content

fakecloud_rds/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeServerlessV2PlatformVersions",
125    "DescribeSourceRegions",
126    "DescribeTenantDatabases",
127    "DescribeValidDBInstanceModifications",
128    "DisableHttpEndpoint",
129    "DownloadDBLogFilePortion",
130    "EnableHttpEndpoint",
131    "FailoverDBCluster",
132    "FailoverGlobalCluster",
133    "ListTagsForResource",
134    "ModifyActivityStream",
135    "ModifyCertificates",
136    "ModifyCurrentDBClusterCapacity",
137    "ModifyCustomDBEngineVersion",
138    "ModifyDBCluster",
139    "ModifyDBClusterEndpoint",
140    "ModifyDBClusterParameterGroup",
141    "ModifyDBClusterSnapshotAttribute",
142    "ModifyDBInstance",
143    "ModifyDBParameterGroup",
144    "ModifyDBProxy",
145    "ModifyDBProxyEndpoint",
146    "ModifyDBProxyTargetGroup",
147    "ModifyDBRecommendation",
148    "ModifyDBShardGroup",
149    "ModifyDBSnapshot",
150    "ModifyDBSnapshotAttribute",
151    "ModifyDBSubnetGroup",
152    "ModifyEventSubscription",
153    "ModifyGlobalCluster",
154    "ModifyIntegration",
155    "ModifyOptionGroup",
156    "ModifyTenantDatabase",
157    "PromoteReadReplica",
158    "PromoteReadReplicaDBCluster",
159    "PurchaseReservedDBInstancesOffering",
160    "RebootDBCluster",
161    "RebootDBInstance",
162    "RebootDBShardGroup",
163    "RegisterDBProxyTargets",
164    "RemoveFromGlobalCluster",
165    "RemoveRoleFromDBCluster",
166    "RemoveRoleFromDBInstance",
167    "RemoveSourceIdentifierFromSubscription",
168    "RemoveTagsFromResource",
169    "ResetDBClusterParameterGroup",
170    "ResetDBParameterGroup",
171    "RestoreDBClusterFromS3",
172    "RestoreDBClusterFromSnapshot",
173    "RestoreDBClusterToPointInTime",
174    "RestoreDBInstanceFromDBSnapshot",
175    "RestoreDBInstanceFromS3",
176    "RestoreDBInstanceToPointInTime",
177    "RevokeDBSecurityGroupIngress",
178    "StartActivityStream",
179    "StartDBCluster",
180    "StartDBInstance",
181    "StartDBInstanceAutomatedBackupsReplication",
182    "StartExportTask",
183    "StopActivityStream",
184    "StopDBCluster",
185    "StopDBInstance",
186    "StopDBInstanceAutomatedBackupsReplication",
187    "SwitchoverBlueGreenDeployment",
188    "SwitchoverGlobalCluster",
189    "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193    pub(crate) state: SharedRdsState,
194    runtime: Option<Arc<RdsRuntime>>,
195    snapshot_store: Option<Arc<dyn SnapshotStore>>,
196    snapshot_lock: Arc<AsyncMutex<()>>,
197    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
201#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204    DbInstance,
205    DbSnapshot,
206    DbParameterGroup,
207    DbCluster,
208    DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
213    /// form AWS publishes in the `aws.rds` event detail.
214    fn as_str(self) -> &'static str {
215        match self {
216            Self::DbInstance => "DB_INSTANCE",
217            Self::DbSnapshot => "DB_SNAPSHOT",
218            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219            Self::DbCluster => "DB_CLUSTER",
220            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221        }
222    }
223
224    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
225    /// API spec this is the kebab-case form (`db-instance`,
226    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
227    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
228    pub(crate) fn describe_events_str(self) -> &'static str {
229        match self {
230            Self::DbInstance => "db-instance",
231            Self::DbSnapshot => "db-snapshot",
232            Self::DbParameterGroup => "db-parameter-group",
233            Self::DbCluster => "db-cluster",
234            Self::DbClusterSnapshot => "db-cluster-snapshot",
235        }
236    }
237
238    fn detail_type(self) -> &'static str {
239        match self {
240            Self::DbInstance => "RDS DB Instance Event",
241            Self::DbSnapshot => "RDS DB Snapshot Event",
242            Self::DbParameterGroup => "RDS DB Parameter Group Event",
243            Self::DbCluster => "RDS DB Cluster Event",
244            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245        }
246    }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261    pub(crate) fn state_handle(&self) -> &SharedRdsState {
262        &self.state
263    }
264}
265
266impl RdsService {
267    pub fn new(state: SharedRdsState) -> Self {
268        Self {
269            state,
270            runtime: None,
271            snapshot_store: None,
272            snapshot_lock: Arc::new(AsyncMutex::new(())),
273            delivery_bus: None,
274        }
275    }
276
277    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278        self.runtime = Some(runtime);
279        self
280    }
281
282    /// Crate-internal accessor for the optional runtime; needed by the
283    /// extras handler so cluster snapshot/restore paths can dump and
284    /// replay member databases via the live container runtime.
285    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286        self.runtime.as_ref()
287    }
288
289    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290        self.snapshot_store = Some(store);
291        self
292    }
293
294    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295        self.delivery_bus = Some(bus);
296        self
297    }
298
299    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
300    /// Also records into the per-account events ring so DescribeEvents
301    /// can serve the row. No-op for the EventBridge side when the bus
302    /// isn't wired (tests, minimal configs).
303    pub(crate) fn emit_event(
304        &self,
305        source_type: RdsSourceType,
306        source_identifier: &str,
307        source_arn: &str,
308        event_id: &str,
309        event_categories: &[&str],
310        message: &str,
311    ) {
312        // Source the account_id off the source_arn (segment 4) — that's
313        // the canonical ARN form for RDS resources.
314        let account_id = source_arn.split(':').nth(4).unwrap_or("");
315        emit_event_static_with_state(
316            self.delivery_bus.as_ref(),
317            Some(&self.state),
318            if account_id.is_empty() {
319                None
320            } else {
321                Some(account_id)
322            },
323            source_type,
324            source_identifier,
325            source_arn,
326            event_id,
327            event_categories,
328            message,
329        );
330    }
331
332    async fn save_snapshot(&self) {
333        save_snapshot_static(
334            self.state.clone(),
335            self.snapshot_store.clone(),
336            self.snapshot_lock.clone(),
337        )
338        .await;
339    }
340
341    /// Recreate the backing Docker/Podman containers for persisted DB
342    /// instances after a fakecloud restart. Without this, persistent
343    /// mode loads the row back into memory with `db_instance_status =
344    /// available` but the container is gone, so the endpoint is dead
345    /// (issue #1338). Each candidate is flipped to `starting`
346    /// synchronously, then a background task brings the container back
347    /// and flips it back to `available`. Instances persisted as
348    /// `stopped` are skipped — `StartDBInstance` revives those.
349    pub async fn recover_persisted_containers(&self) {
350        let Some(runtime) = self.runtime.clone() else {
351            return;
352        };
353
354        struct Pending {
355            account_id: String,
356            region: String,
357            id: String,
358            arn: String,
359            engine: String,
360            engine_version: String,
361            username: String,
362            password: String,
363            db_name: String,
364        }
365
366        let pending: Vec<Pending> = {
367            let mut accounts = self.state.write();
368            let mut out = Vec::new();
369            for (_, state) in accounts.iter_mut() {
370                let account_id = state.account_id.clone();
371                let region = state.region.clone();
372                for (id, inst) in state.instances.iter_mut() {
373                    // "creating" is included so an instance whose background
374                    // create task hadn't finished (and re-saved it as
375                    // "available") when the process crashed is resumed rather
376                    // than silently dropped on restart — the API already
377                    // returned it to the client, so DescribeDBInstances must
378                    // not lose it (bug-hunt 2026-06-13, finding 4.3). Recovery
379                    // re-drives it through `ensure_*` to a live container.
380                    if !matches!(
381                        inst.db_instance_status.as_str(),
382                        "creating"
383                            | "available"
384                            | "starting"
385                            | "modifying"
386                            | "rebooting"
387                            | "backing-up"
388                    ) {
389                        continue;
390                    }
391                    inst.db_instance_status = "starting".to_string();
392                    out.push(Pending {
393                        account_id: account_id.clone(),
394                        region: region.clone(),
395                        id: id.clone(),
396                        arn: inst.db_instance_arn.clone(),
397                        engine: inst.engine.clone(),
398                        engine_version: inst.engine_version.clone(),
399                        username: inst.master_username.clone(),
400                        password: inst.master_user_password.clone(),
401                        db_name: inst
402                            .db_name
403                            .clone()
404                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
405                    });
406                }
407            }
408            out
409        };
410
411        if pending.is_empty() {
412            return;
413        }
414        tracing::info!(
415            count = pending.len(),
416            "recovering backing containers for persisted rds instances",
417        );
418
419        for p in pending {
420            let runtime = runtime.clone();
421            let state = self.state.clone();
422            let snapshot_store = self.snapshot_store.clone();
423            let snapshot_lock = self.snapshot_lock.clone();
424            let delivery_bus = self.delivery_bus.clone();
425            tokio::spawn(async move {
426                match runtime
427                    .ensure_postgres(
428                        &p.id,
429                        &p.engine,
430                        &p.engine_version,
431                        &p.username,
432                        &p.password,
433                        &p.db_name,
434                        &p.account_id,
435                        &p.region,
436                    )
437                    .await
438                {
439                    Ok(running) => {
440                        {
441                            let mut accounts = state.write();
442                            if let Some(s) = accounts.get_mut(&p.account_id) {
443                                if let Some(inst) = s.instances.get_mut(&p.id) {
444                                    inst.db_instance_status = "available".to_string();
445                                    inst.endpoint_address = running.endpoint_address.clone();
446                                    inst.port = i32::from(running.endpoint_port);
447                                    inst.host_port = running.host_port;
448                                    inst.container_id = running.container_id;
449                                }
450                            }
451                        }
452                        save_snapshot_static(
453                            state.clone(),
454                            snapshot_store.clone(),
455                            snapshot_lock.clone(),
456                        )
457                        .await;
458                        emit_event_static(
459                            delivery_bus.as_ref(),
460                            RdsSourceType::DbInstance,
461                            &p.id,
462                            &p.arn,
463                            "RDS-EVENT-0088",
464                            &["notification"],
465                            "DB instance restarted after fakecloud restart",
466                        );
467                    }
468                    Err(error) => {
469                        tracing::error!(
470                            %error,
471                            db_instance_identifier = %p.id,
472                            "failed to recover rds backing container after restart",
473                        );
474                        {
475                            let mut accounts = state.write();
476                            if let Some(s) = accounts.get_mut(&p.account_id) {
477                                if let Some(inst) = s.instances.get_mut(&p.id) {
478                                    inst.db_instance_status = "failed".to_string();
479                                }
480                            }
481                        }
482                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
483                    }
484                }
485            });
486        }
487    }
488
489    /// Stop the backing container for `db_instance_identifier` and mark
490    /// the row `stopped`. Synchronous wrt the runtime so callers see a
491    /// real `stopped` status by the time the response goes back; mirrors
492    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
493    /// the visible status immediately.
494    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
496
497        // Look up the instance first so a missing identifier returns the
498        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
499        // missing container runtime. Conformance probes hit Start/Stop with
500        // synthetic identifiers and expect the documented error shape.
501        let arn = {
502            let accounts = self.state.read();
503            let empty = RdsState::new(&request.account_id, &request.region);
504            let state = accounts.get(&request.account_id).unwrap_or(&empty);
505            state
506                .instances
507                .get(&db_instance_identifier)
508                .map(|i| i.db_instance_arn.clone())
509                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
510        };
511
512        if let Some(runtime) = self.runtime.as_ref() {
513            runtime.stop_container(&db_instance_identifier).await;
514        }
515
516        let instance = {
517            let mut accounts = self.state.write();
518            let state = accounts.get_or_create(&request.account_id);
519            let inst = state
520                .instances
521                .get_mut(&db_instance_identifier)
522                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
523            inst.db_instance_status = "stopped".to_string();
524            inst.container_id = String::new();
525            inst.clone()
526        };
527
528        self.emit_event(
529            RdsSourceType::DbInstance,
530            &db_instance_identifier,
531            &arn,
532            "RDS-EVENT-0089",
533            &["notification"],
534            "DB instance stopped",
535        );
536
537        Ok(AwsResponse::xml(
538            StatusCode::OK,
539            query_response_xml(
540                "StopDBInstance",
541                RDS_NS,
542                &format!(
543                    "<DBInstance>{}</DBInstance>",
544                    db_instance_xml(&instance, Some("stopped"))
545                ),
546                &request.request_id,
547            ),
548        ))
549    }
550
551    /// Restart a stopped DB instance: spin up the backing container and
552    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
553    async fn start_db_instance(
554        &self,
555        request: &AwsRequest,
556    ) -> Result<AwsResponse, AwsServiceError> {
557        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
558
559        // Look up the instance first so a missing identifier returns the
560        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
561        // missing container runtime.
562        let instance = {
563            let accounts = self.state.read();
564            let empty = RdsState::new(&request.account_id, &request.region);
565            let state = accounts.get(&request.account_id).unwrap_or(&empty);
566            state
567                .instances
568                .get(&db_instance_identifier)
569                .cloned()
570                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
571        };
572
573        // Flip to `starting` so concurrent DescribeDBInstances callers
574        // see the in-flight state.
575        {
576            let mut accounts = self.state.write();
577            let state = accounts.get_or_create(&request.account_id);
578            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
579                inst.db_instance_status = "starting".to_string();
580            }
581        }
582
583        let running = if let Some(runtime) = self.runtime.as_ref() {
584            match runtime
585                .ensure_postgres(
586                    &db_instance_identifier,
587                    &instance.engine,
588                    &instance.engine_version,
589                    &instance.master_username,
590                    &instance.master_user_password,
591                    instance
592                        .db_name
593                        .as_deref()
594                        .unwrap_or(default_db_name(&instance.engine)),
595                    &request.account_id,
596                    &request.region,
597                )
598                .await
599            {
600                Ok(r) => Some(r),
601                Err(e) => {
602                    // Roll the row back to `stopped` so the next
603                    // Describe doesn't report a permanently-`starting`
604                    // instance with no container behind it.
605                    let mut accounts = self.state.write();
606                    let state = accounts.get_or_create(&request.account_id);
607                    if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
608                        inst.db_instance_status = "stopped".to_string();
609                    }
610                    return Err(runtime_error_to_service_error(e));
611                }
612            }
613        } else {
614            // No container runtime configured: StartDBInstance should
615            // fail rather than report success against a backend that
616            // does not exist. Roll the row back so subsequent calls
617            // don't see a stuck `starting` state.
618            {
619                let mut accounts = self.state.write();
620                let state = accounts.get_or_create(&request.account_id);
621                if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
622                    inst.db_instance_status = "stopped".to_string();
623                }
624            }
625            return Err(AwsServiceError::aws_error(
626                StatusCode::SERVICE_UNAVAILABLE,
627                "InternalFailure",
628                "Container runtime is not configured; cannot start DB instance",
629            ));
630        };
631
632        let instance = {
633            let mut accounts = self.state.write();
634            let state = accounts.get_or_create(&request.account_id);
635            let inst = state
636                .instances
637                .get_mut(&db_instance_identifier)
638                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
639            inst.db_instance_status = "available".to_string();
640            inst.endpoint_address = "127.0.0.1".to_string();
641            if let Some(r) = running {
642                inst.endpoint_address = r.endpoint_address.clone();
643                inst.port = i32::from(r.endpoint_port);
644                inst.host_port = r.host_port;
645                inst.container_id = r.container_id;
646            }
647            inst.clone()
648        };
649
650        self.emit_event(
651            RdsSourceType::DbInstance,
652            &db_instance_identifier,
653            &instance.db_instance_arn,
654            "RDS-EVENT-0088",
655            &["notification"],
656            "DB instance started",
657        );
658
659        Ok(AwsResponse::xml(
660            StatusCode::OK,
661            query_response_xml(
662                "StartDBInstance",
663                RDS_NS,
664                &format!(
665                    "<DBInstance>{}</DBInstance>",
666                    db_instance_xml(&instance, None)
667                ),
668                &request.request_id,
669            ),
670        ))
671    }
672}
673
674/// Persist the current `RdsState` to the configured snapshot store. Free
675/// function so background tasks (e.g. the create-DB-instance container-start
676/// task) can save without holding a `&RdsService`. Returns immediately when
677/// Whether the given AWS error code is in the AddTagsToResource Smithy
678/// model's declared error set. Used so undeclared `*NotFound` codes
679/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
680/// swallowed into a no-op rather than surfaced as a non-modelled error.
681fn is_declared_add_tags_not_found(code: &str) -> bool {
682    matches!(
683        code,
684        "BlueGreenDeploymentNotFoundFault"
685            | "DBClusterNotFoundFault"
686            | "DBInstanceNotFound"
687            | "DBProxyEndpointNotFoundFault"
688            | "DBProxyNotFoundFault"
689            | "DBProxyTargetGroupNotFoundFault"
690            | "DBShardGroupNotFound"
691            | "DBSnapshotNotFound"
692            | "DBSnapshotTenantDatabaseNotFoundFault"
693            | "IntegrationNotFoundFault"
694            | "InvalidDBClusterEndpointStateFault"
695            | "InvalidDBClusterStateFault"
696            | "InvalidDBInstanceState"
697            | "TenantDatabaseNotFound"
698    )
699}
700
701/// no store is configured (memory-mode runs).
702async fn save_snapshot_static(
703    state: SharedRdsState,
704    store: Option<Arc<dyn SnapshotStore>>,
705    lock: Arc<AsyncMutex<()>>,
706) {
707    let Some(store) = store else {
708        return;
709    };
710    let _guard = lock.lock().await;
711    let snapshot = RdsSnapshot {
712        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
713        state: None,
714        accounts: Some(state.read().clone()),
715    };
716    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
717        let bytes = serde_json::to_vec(&snapshot)
718            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
719        store.save(&bytes)
720    })
721    .await;
722    match join {
723        Ok(Ok(())) => {}
724        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
725        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
726    }
727}
728
729impl RdsService {
730    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
731    ///
732    /// RDS operations that start, stop, or reach into a database container fail
733    /// with a consistent wire error when the daemon (Docker/Podman) is missing
734    /// rather than each caller restating the message.
735    /// Resolve the container runtime or return a declared error shape.
736    /// `InsufficientDBInstanceCapacity` is declared on every op that
737    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
738    /// Read Replica), and is the closest Smithy-modelled analogue for
739    /// "fakecloud can't satisfy this DB request right now".
740    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
741        self.runtime.as_ref().ok_or_else(|| {
742            AwsServiceError::aws_error(
743                StatusCode::SERVICE_UNAVAILABLE,
744                "InsufficientDBInstanceCapacity",
745                "Docker/Podman is required for RDS DB instances but is not available",
746            )
747        })
748    }
749}
750
751#[async_trait]
752impl AwsService for RdsService {
753    fn service_name(&self) -> &str {
754        "rds"
755    }
756
757    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
758        // Centralized Smithy-aligned validation. Returns the appropriate
759        // `MissingParameter` / `InvalidParameterValue` error before the
760        // per-action handler runs. Actions without a constraint entry
761        // fall through unchanged.
762        crate::validation::prevalidate(request.action.as_str(), &request)?;
763
764        let mutates = is_mutating_action(request.action.as_str());
765        let result = match request.action.as_str() {
766            "AddTagsToResource" => self.add_tags_to_resource(&request),
767            "CreateDBInstance" => self.create_db_instance(&request).await,
768            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
769            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
770            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
771            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
772            "DeleteDBInstance" => self.delete_db_instance(&request).await,
773            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
774            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
775            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
776            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
777            "DescribeDBInstances" => self.describe_db_instances(&request),
778            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
779            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
780            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
781            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
782            "DescribeOrderableDBInstanceOptions" => {
783                self.describe_orderable_db_instance_options(&request)
784            }
785            "ListTagsForResource" => self.list_tags_for_resource(&request),
786            "ModifyDBInstance" => self.modify_db_instance(&request),
787            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
788            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
789            "RebootDBInstance" => self.reboot_db_instance(&request).await,
790            "StartDBInstance" => self.start_db_instance(&request).await,
791            "StopDBInstance" => self.stop_db_instance(&request).await,
792            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
793            "RestoreDBInstanceFromDBSnapshot" => {
794                self.restore_db_instance_from_db_snapshot(&request).await
795            }
796            "RestoreDBInstanceToPointInTime" => {
797                self.restore_db_instance_to_point_in_time(&request).await
798            }
799            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
800            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
801            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
802            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
803            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
804            "RestoreDBClusterToPointInTime" => {
805                self.restore_db_cluster_to_point_in_time(&request).await
806            }
807            _ => self.handle_extra_action(&request),
808        };
809        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
810            self.save_snapshot().await;
811        }
812        result
813    }
814
815    fn supported_actions(&self) -> &[&str] {
816        SUPPORTED_ACTIONS
817    }
818}
819
820impl RdsService {}
821
822/// Render a single user-set parameter as the XML shape AWS emits inside
823/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
824/// alongside user values so we report `dynamic`/`string` defaults.
825pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
826    format!(
827        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
828        xml_escape(name),
829        xml_escape(value),
830    )
831}
832
833/// Render a single engine-default parameter as the XML shape AWS emits
834/// inside `DescribeDB(Cluster)Parameters` and
835/// `DescribeEngineDefault(Cluster)Parameters` responses.
836pub(crate) fn render_engine_default_parameter_xml(
837    default: &crate::state::EngineDefaultParameter,
838) -> String {
839    format!(
840        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
841        xml_escape(default.name),
842        xml_escape(default.value),
843        xml_escape(default.apply_type),
844        xml_escape(default.data_type),
845        xml_escape(default.allowed_values),
846        default.is_modifiable,
847    )
848}
849
850/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
851/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
852/// (the `Parameter` list location name from the Smithy model); we also
853/// accept the generic `Parameters.member.N` form so hand-built clients
854/// using the default Query list shape keep working. Skips members
855/// missing a name or value; ApplyMethod is accepted but ignored
856/// (single-state model).
857pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
858    let mut out = Vec::new();
859    for prefix in ["Parameters.Parameter", "Parameters.member"] {
860        let mut index = 1;
861        loop {
862            let name_key = format!("{prefix}.{index}.ParameterName");
863            let value_key = format!("{prefix}.{index}.ParameterValue");
864            let name = optional_query_param(request, &name_key);
865            let value = optional_query_param(request, &value_key);
866            if name.is_none() && value.is_none() {
867                break;
868            }
869            if let (Some(n), Some(v)) = (name, value) {
870                if !n.is_empty() {
871                    out.push((n, v));
872                }
873            }
874            index += 1;
875        }
876    }
877    out
878}
879
880/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
881/// the absolute path inside the running container. Unknown names fall
882/// through as-is so callers can also fetch arbitrary paths.
883fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
884    match (engine, log_file_name) {
885        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
886        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
887        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
888        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
889            "/var/log/mysql/slow.log".to_string()
890        }
891        _ => log_file_name.to_string(),
892    }
893}
894
895pub(crate) struct PaginationResult<T> {
896    items: Vec<T>,
897    next_marker: Option<String>,
898}
899
900/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
901/// promoting it to writer when the cluster has none. Idempotent:
902/// re-attaching an existing member is a no-op.
903fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
904    use serde_json::{json, Value};
905    let Some(map) = state.extras.get_mut("clusters") else {
906        return;
907    };
908    let Some(entry) = map.get_mut(cluster_id) else {
909        return;
910    };
911    let Some(obj) = entry.as_object_mut() else {
912        return;
913    };
914    let mut members: Vec<Value> = obj
915        .get("DBClusterMembers")
916        .and_then(|v| v.as_array())
917        .cloned()
918        .unwrap_or_default();
919    if members
920        .iter()
921        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
922    {
923        return;
924    }
925    let has_writer = members
926        .iter()
927        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
928    let promotion_tier = (members.len() as i64) + 1;
929    members.push(json!({
930        "DBInstanceIdentifier": instance_id,
931        "IsClusterWriter": !has_writer,
932        "DBClusterParameterGroupStatus": "in-sync",
933        "PromotionTier": promotion_tier,
934    }));
935    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
936    if !has_writer {
937        obj.insert(
938            "WriterDBInstanceIdentifier".to_string(),
939            Value::String(instance_id.to_string()),
940        );
941    }
942}
943
944#[path = "../service_helpers.rs"]
945mod service_helpers;
946pub(crate) use service_helpers::*;
947
948#[cfg(test)]
949#[path = "../service_tests.rs"]
950mod tests;