Skip to main content

fakecloud_rds/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeServerlessV2PlatformVersions",
125    "DescribeSourceRegions",
126    "DescribeTenantDatabases",
127    "DescribeValidDBInstanceModifications",
128    "DisableHttpEndpoint",
129    "DownloadDBLogFilePortion",
130    "EnableHttpEndpoint",
131    "FailoverDBCluster",
132    "FailoverGlobalCluster",
133    "ListTagsForResource",
134    "ModifyActivityStream",
135    "ModifyCertificates",
136    "ModifyCurrentDBClusterCapacity",
137    "ModifyCustomDBEngineVersion",
138    "ModifyDBCluster",
139    "ModifyDBClusterEndpoint",
140    "ModifyDBClusterParameterGroup",
141    "ModifyDBClusterSnapshotAttribute",
142    "ModifyDBInstance",
143    "ModifyDBParameterGroup",
144    "ModifyDBProxy",
145    "ModifyDBProxyEndpoint",
146    "ModifyDBProxyTargetGroup",
147    "ModifyDBRecommendation",
148    "ModifyDBShardGroup",
149    "ModifyDBSnapshot",
150    "ModifyDBSnapshotAttribute",
151    "ModifyDBSubnetGroup",
152    "ModifyEventSubscription",
153    "ModifyGlobalCluster",
154    "ModifyIntegration",
155    "ModifyOptionGroup",
156    "ModifyTenantDatabase",
157    "PromoteReadReplica",
158    "PromoteReadReplicaDBCluster",
159    "PurchaseReservedDBInstancesOffering",
160    "RebootDBCluster",
161    "RebootDBInstance",
162    "RebootDBShardGroup",
163    "RegisterDBProxyTargets",
164    "RemoveFromGlobalCluster",
165    "RemoveRoleFromDBCluster",
166    "RemoveRoleFromDBInstance",
167    "RemoveSourceIdentifierFromSubscription",
168    "RemoveTagsFromResource",
169    "ResetDBClusterParameterGroup",
170    "ResetDBParameterGroup",
171    "RestoreDBClusterFromS3",
172    "RestoreDBClusterFromSnapshot",
173    "RestoreDBClusterToPointInTime",
174    "RestoreDBInstanceFromDBSnapshot",
175    "RestoreDBInstanceFromS3",
176    "RestoreDBInstanceToPointInTime",
177    "RevokeDBSecurityGroupIngress",
178    "StartActivityStream",
179    "StartDBCluster",
180    "StartDBInstance",
181    "StartDBInstanceAutomatedBackupsReplication",
182    "StartExportTask",
183    "StopActivityStream",
184    "StopDBCluster",
185    "StopDBInstance",
186    "StopDBInstanceAutomatedBackupsReplication",
187    "SwitchoverBlueGreenDeployment",
188    "SwitchoverGlobalCluster",
189    "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193    pub(crate) state: SharedRdsState,
194    runtime: Option<Arc<RdsRuntime>>,
195    snapshot_store: Option<Arc<dyn SnapshotStore>>,
196    snapshot_lock: Arc<AsyncMutex<()>>,
197    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
201#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204    DbInstance,
205    DbSnapshot,
206    DbParameterGroup,
207    DbCluster,
208    DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
213    /// form AWS publishes in the `aws.rds` event detail.
214    fn as_str(self) -> &'static str {
215        match self {
216            Self::DbInstance => "DB_INSTANCE",
217            Self::DbSnapshot => "DB_SNAPSHOT",
218            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219            Self::DbCluster => "DB_CLUSTER",
220            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221        }
222    }
223
224    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
225    /// API spec this is the kebab-case form (`db-instance`,
226    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
227    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
228    pub(crate) fn describe_events_str(self) -> &'static str {
229        match self {
230            Self::DbInstance => "db-instance",
231            Self::DbSnapshot => "db-snapshot",
232            Self::DbParameterGroup => "db-parameter-group",
233            Self::DbCluster => "db-cluster",
234            Self::DbClusterSnapshot => "db-cluster-snapshot",
235        }
236    }
237
238    fn detail_type(self) -> &'static str {
239        match self {
240            Self::DbInstance => "RDS DB Instance Event",
241            Self::DbSnapshot => "RDS DB Snapshot Event",
242            Self::DbParameterGroup => "RDS DB Parameter Group Event",
243            Self::DbCluster => "RDS DB Cluster Event",
244            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245        }
246    }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261    pub(crate) fn state_handle(&self) -> &SharedRdsState {
262        &self.state
263    }
264}
265
266impl RdsService {
267    pub fn new(state: SharedRdsState) -> Self {
268        Self {
269            state,
270            runtime: None,
271            snapshot_store: None,
272            snapshot_lock: Arc::new(AsyncMutex::new(())),
273            delivery_bus: None,
274        }
275    }
276
277    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278        self.runtime = Some(runtime);
279        self
280    }
281
282    /// Crate-internal accessor for the optional runtime; needed by the
283    /// extras handler so cluster snapshot/restore paths can dump and
284    /// replay member databases via the live container runtime.
285    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286        self.runtime.as_ref()
287    }
288
289    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290        self.snapshot_store = Some(store);
291        self
292    }
293
294    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295        self.delivery_bus = Some(bus);
296        self
297    }
298
299    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
300    /// Also records into the per-account events ring so DescribeEvents
301    /// can serve the row. No-op for the EventBridge side when the bus
302    /// isn't wired (tests, minimal configs).
303    pub(crate) fn emit_event(
304        &self,
305        source_type: RdsSourceType,
306        source_identifier: &str,
307        source_arn: &str,
308        event_id: &str,
309        event_categories: &[&str],
310        message: &str,
311    ) {
312        // Source the account_id off the source_arn (segment 4) — that's
313        // the canonical ARN form for RDS resources.
314        let account_id = source_arn.split(':').nth(4).unwrap_or("");
315        emit_event_static_with_state(
316            self.delivery_bus.as_ref(),
317            Some(&self.state),
318            if account_id.is_empty() {
319                None
320            } else {
321                Some(account_id)
322            },
323            source_type,
324            source_identifier,
325            source_arn,
326            event_id,
327            event_categories,
328            message,
329        );
330    }
331
332    async fn save_snapshot(&self) {
333        save_snapshot_static(
334            self.state.clone(),
335            self.snapshot_store.clone(),
336            self.snapshot_lock.clone(),
337        )
338        .await;
339    }
340
341    /// Recreate the backing Docker/Podman containers for persisted DB
342    /// instances after a fakecloud restart. Without this, persistent
343    /// mode loads the row back into memory with `db_instance_status =
344    /// available` but the container is gone, so the endpoint is dead
345    /// (issue #1338). Each candidate is flipped to `starting`
346    /// synchronously, then a background task brings the container back
347    /// and flips it back to `available`. Instances persisted as
348    /// `stopped` are skipped — `StartDBInstance` revives those.
349    pub async fn recover_persisted_containers(&self) {
350        let Some(runtime) = self.runtime.clone() else {
351            return;
352        };
353
354        struct Pending {
355            account_id: String,
356            region: String,
357            id: String,
358            arn: String,
359            engine: String,
360            engine_version: String,
361            username: String,
362            password: String,
363            db_name: String,
364            tags: Vec<crate::state::RdsTag>,
365        }
366
367        let pending: Vec<Pending> = {
368            let mut accounts = self.state.write();
369            let mut out = Vec::new();
370            for (_, state) in accounts.iter_mut() {
371                let account_id = state.account_id.clone();
372                let region = state.region.clone();
373                for (id, inst) in state.instances.iter_mut() {
374                    // "creating" is included so an instance whose background
375                    // create task hadn't finished (and re-saved it as
376                    // "available") when the process crashed is resumed rather
377                    // than silently dropped on restart — the API already
378                    // returned it to the client, so DescribeDBInstances must
379                    // not lose it (bug-hunt 2026-06-13, finding 4.3). Recovery
380                    // re-drives it through `ensure_*` to a live container.
381                    if !matches!(
382                        inst.db_instance_status.as_str(),
383                        "creating"
384                            | "available"
385                            | "starting"
386                            | "modifying"
387                            | "rebooting"
388                            | "backing-up"
389                    ) {
390                        continue;
391                    }
392                    inst.db_instance_status = "starting".to_string();
393                    out.push(Pending {
394                        account_id: account_id.clone(),
395                        region: region.clone(),
396                        id: id.clone(),
397                        arn: inst.db_instance_arn.clone(),
398                        engine: inst.engine.clone(),
399                        engine_version: inst.engine_version.clone(),
400                        username: inst.master_username.clone(),
401                        password: inst.master_user_password.clone(),
402                        db_name: inst
403                            .db_name
404                            .clone()
405                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406                        tags: inst.tags.clone(),
407                    });
408                }
409            }
410            out
411        };
412
413        if pending.is_empty() {
414            return;
415        }
416        tracing::info!(
417            count = pending.len(),
418            "recovering backing containers for persisted rds instances",
419        );
420
421        for p in pending {
422            let runtime = runtime.clone();
423            let state = self.state.clone();
424            let snapshot_store = self.snapshot_store.clone();
425            let snapshot_lock = self.snapshot_lock.clone();
426            let delivery_bus = self.delivery_bus.clone();
427            tokio::spawn(async move {
428                match runtime
429                    .ensure_postgres(
430                        &p.id,
431                        &p.engine,
432                        &p.engine_version,
433                        &p.username,
434                        &p.password,
435                        &p.db_name,
436                        &p.account_id,
437                        &p.region,
438                        &p.tags,
439                    )
440                    .await
441                {
442                    Ok(running) => {
443                        {
444                            let mut accounts = state.write();
445                            if let Some(s) = accounts.get_mut(&p.account_id) {
446                                if let Some(inst) = s.instances.get_mut(&p.id) {
447                                    inst.db_instance_status = "available".to_string();
448                                    inst.endpoint_address = running.endpoint_address.clone();
449                                    inst.port = i32::from(running.endpoint_port);
450                                    inst.host_port = running.host_port;
451                                    inst.container_id = running.container_id;
452                                }
453                            }
454                        }
455                        save_snapshot_static(
456                            state.clone(),
457                            snapshot_store.clone(),
458                            snapshot_lock.clone(),
459                        )
460                        .await;
461                        emit_event_static(
462                            delivery_bus.as_ref(),
463                            RdsSourceType::DbInstance,
464                            &p.id,
465                            &p.arn,
466                            "RDS-EVENT-0088",
467                            &["notification"],
468                            "DB instance restarted after fakecloud restart",
469                        );
470                    }
471                    Err(error) => {
472                        tracing::error!(
473                            %error,
474                            db_instance_identifier = %p.id,
475                            "failed to recover rds backing container after restart",
476                        );
477                        {
478                            let mut accounts = state.write();
479                            if let Some(s) = accounts.get_mut(&p.account_id) {
480                                if let Some(inst) = s.instances.get_mut(&p.id) {
481                                    inst.db_instance_status = "failed".to_string();
482                                }
483                            }
484                        }
485                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486                    }
487                }
488            });
489        }
490    }
491
492    /// Stop the backing container for `db_instance_identifier` and mark
493    /// the row `stopped`. Synchronous wrt the runtime so callers see a
494    /// real `stopped` status by the time the response goes back; mirrors
495    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
496    /// the visible status immediately.
497    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500        // Look up the instance first so a missing identifier returns the
501        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
502        // missing container runtime. Conformance probes hit Start/Stop with
503        // synthetic identifiers and expect the documented error shape.
504        let arn = {
505            let accounts = self.state.read();
506            let empty = RdsState::new(&request.account_id, &request.region);
507            let state = accounts.get(&request.account_id).unwrap_or(&empty);
508            state
509                .instances
510                .get(&db_instance_identifier)
511                .map(|i| i.db_instance_arn.clone())
512                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513        };
514
515        if let Some(runtime) = self.runtime.as_ref() {
516            runtime.stop_container(&db_instance_identifier).await;
517        }
518
519        let instance = {
520            let mut accounts = self.state.write();
521            let state = accounts.get_or_create(&request.account_id);
522            let inst = state
523                .instances
524                .get_mut(&db_instance_identifier)
525                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526            inst.db_instance_status = "stopped".to_string();
527            inst.container_id = String::new();
528            inst.clone()
529        };
530
531        self.emit_event(
532            RdsSourceType::DbInstance,
533            &db_instance_identifier,
534            &arn,
535            "RDS-EVENT-0089",
536            &["notification"],
537            "DB instance stopped",
538        );
539
540        Ok(AwsResponse::xml(
541            StatusCode::OK,
542            query_response_xml(
543                "StopDBInstance",
544                RDS_NS,
545                &format!(
546                    "<DBInstance>{}</DBInstance>",
547                    db_instance_xml(&instance, Some("stopped"))
548                ),
549                &request.request_id,
550            ),
551        ))
552    }
553
554    /// Restart a stopped DB instance: spin up the backing container and
555    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
556    async fn start_db_instance(
557        &self,
558        request: &AwsRequest,
559    ) -> Result<AwsResponse, AwsServiceError> {
560        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562        // Look up the instance first so a missing identifier returns the
563        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
564        // missing container runtime.
565        let instance = {
566            let accounts = self.state.read();
567            let empty = RdsState::new(&request.account_id, &request.region);
568            let state = accounts.get(&request.account_id).unwrap_or(&empty);
569            state
570                .instances
571                .get(&db_instance_identifier)
572                .cloned()
573                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574        };
575
576        // Flip to `starting` so concurrent DescribeDBInstances callers
577        // see the in-flight state.
578        {
579            let mut accounts = self.state.write();
580            let state = accounts.get_or_create(&request.account_id);
581            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582                inst.db_instance_status = "starting".to_string();
583            }
584        }
585
586        let running = if let Some(runtime) = self.runtime.as_ref() {
587            match runtime
588                .ensure_postgres(
589                    &db_instance_identifier,
590                    &instance.engine,
591                    &instance.engine_version,
592                    &instance.master_username,
593                    &instance.master_user_password,
594                    instance
595                        .db_name
596                        .as_deref()
597                        .unwrap_or(default_db_name(&instance.engine)),
598                    &request.account_id,
599                    &request.region,
600                    &instance.tags,
601                )
602                .await
603            {
604                Ok(r) => Some(r),
605                Err(e) => {
606                    // Roll the row back to `stopped` so the next
607                    // Describe doesn't report a permanently-`starting`
608                    // instance with no container behind it.
609                    let mut accounts = self.state.write();
610                    let state = accounts.get_or_create(&request.account_id);
611                    if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
612                        inst.db_instance_status = "stopped".to_string();
613                    }
614                    return Err(runtime_error_to_service_error(e));
615                }
616            }
617        } else {
618            // No container runtime configured: StartDBInstance should
619            // fail rather than report success against a backend that
620            // does not exist. Roll the row back so subsequent calls
621            // don't see a stuck `starting` state.
622            {
623                let mut accounts = self.state.write();
624                let state = accounts.get_or_create(&request.account_id);
625                if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
626                    inst.db_instance_status = "stopped".to_string();
627                }
628            }
629            return Err(AwsServiceError::aws_error(
630                StatusCode::SERVICE_UNAVAILABLE,
631                "InternalFailure",
632                "Container runtime is not configured; cannot start DB instance",
633            ));
634        };
635
636        let instance = {
637            let mut accounts = self.state.write();
638            let state = accounts.get_or_create(&request.account_id);
639            let inst = state
640                .instances
641                .get_mut(&db_instance_identifier)
642                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
643            inst.db_instance_status = "available".to_string();
644            inst.endpoint_address = "127.0.0.1".to_string();
645            if let Some(r) = running {
646                inst.endpoint_address = r.endpoint_address.clone();
647                inst.port = i32::from(r.endpoint_port);
648                inst.host_port = r.host_port;
649                inst.container_id = r.container_id;
650            }
651            inst.clone()
652        };
653
654        self.emit_event(
655            RdsSourceType::DbInstance,
656            &db_instance_identifier,
657            &instance.db_instance_arn,
658            "RDS-EVENT-0088",
659            &["notification"],
660            "DB instance started",
661        );
662
663        Ok(AwsResponse::xml(
664            StatusCode::OK,
665            query_response_xml(
666                "StartDBInstance",
667                RDS_NS,
668                &format!(
669                    "<DBInstance>{}</DBInstance>",
670                    db_instance_xml(&instance, None)
671                ),
672                &request.request_id,
673            ),
674        ))
675    }
676}
677
678/// Persist the current `RdsState` to the configured snapshot store. Free
679/// function so background tasks (e.g. the create-DB-instance container-start
680/// task) can save without holding a `&RdsService`. Returns immediately when
681/// Whether the given AWS error code is in the AddTagsToResource Smithy
682/// model's declared error set. Used so undeclared `*NotFound` codes
683/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
684/// swallowed into a no-op rather than surfaced as a non-modelled error.
685fn is_declared_add_tags_not_found(code: &str) -> bool {
686    matches!(
687        code,
688        "BlueGreenDeploymentNotFoundFault"
689            | "DBClusterNotFoundFault"
690            | "DBInstanceNotFound"
691            | "DBProxyEndpointNotFoundFault"
692            | "DBProxyNotFoundFault"
693            | "DBProxyTargetGroupNotFoundFault"
694            | "DBShardGroupNotFound"
695            | "DBSnapshotNotFound"
696            | "DBSnapshotTenantDatabaseNotFoundFault"
697            | "IntegrationNotFoundFault"
698            | "InvalidDBClusterEndpointStateFault"
699            | "InvalidDBClusterStateFault"
700            | "InvalidDBInstanceState"
701            | "TenantDatabaseNotFound"
702    )
703}
704
705/// no store is configured (memory-mode runs).
706async fn save_snapshot_static(
707    state: SharedRdsState,
708    store: Option<Arc<dyn SnapshotStore>>,
709    lock: Arc<AsyncMutex<()>>,
710) {
711    let Some(store) = store else {
712        return;
713    };
714    let _guard = lock.lock().await;
715    let snapshot = RdsSnapshot {
716        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
717        state: None,
718        accounts: Some(state.read().clone()),
719    };
720    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
721        let bytes = serde_json::to_vec(&snapshot)
722            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
723        store.save(&bytes)
724    })
725    .await;
726    match join {
727        Ok(Ok(())) => {}
728        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
729        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
730    }
731}
732
733impl RdsService {
734    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
735    ///
736    /// RDS operations that start, stop, or reach into a database container fail
737    /// with a consistent wire error when the daemon (Docker/Podman) is missing
738    /// rather than each caller restating the message.
739    /// Resolve the container runtime or return a declared error shape.
740    /// `InsufficientDBInstanceCapacity` is declared on every op that
741    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
742    /// Read Replica), and is the closest Smithy-modelled analogue for
743    /// "fakecloud can't satisfy this DB request right now".
744    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
745        self.runtime.as_ref().ok_or_else(|| {
746            AwsServiceError::aws_error(
747                StatusCode::SERVICE_UNAVAILABLE,
748                "InsufficientDBInstanceCapacity",
749                "Docker/Podman is required for RDS DB instances but is not available",
750            )
751        })
752    }
753
754    /// Build a hook that persists the current state when invoked, or `None` in
755    /// memory mode. The CloudFormation provisioner mutates `state` directly and
756    /// uses this to write a CFN-provisioned resource through to disk.
757    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
758        let store = self.snapshot_store.clone()?;
759        let state = self.state.clone();
760        let lock = self.snapshot_lock.clone();
761        Some(Arc::new(move || {
762            let state = state.clone();
763            let store = store.clone();
764            let lock = lock.clone();
765            Box::pin(async move {
766                save_snapshot_static(state, Some(store), lock).await;
767            })
768        }))
769    }
770}
771
772#[async_trait]
773impl AwsService for RdsService {
774    fn service_name(&self) -> &str {
775        "rds"
776    }
777
778    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779        // Centralized Smithy-aligned validation. Returns the appropriate
780        // `MissingParameter` / `InvalidParameterValue` error before the
781        // per-action handler runs. Actions without a constraint entry
782        // fall through unchanged.
783        crate::validation::prevalidate(request.action.as_str(), &request)?;
784
785        let mutates = is_mutating_action(request.action.as_str());
786        let result = match request.action.as_str() {
787            "AddTagsToResource" => self.add_tags_to_resource(&request),
788            "CreateDBInstance" => self.create_db_instance(&request).await,
789            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
790            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
791            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
792            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
793            "DeleteDBInstance" => self.delete_db_instance(&request).await,
794            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
795            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
796            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
797            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
798            "DescribeDBInstances" => self.describe_db_instances(&request),
799            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
800            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
801            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
802            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
803            "DescribeOrderableDBInstanceOptions" => {
804                self.describe_orderable_db_instance_options(&request)
805            }
806            "ListTagsForResource" => self.list_tags_for_resource(&request),
807            "ModifyDBInstance" => self.modify_db_instance(&request),
808            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
809            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
810            "RebootDBInstance" => self.reboot_db_instance(&request).await,
811            "StartDBInstance" => self.start_db_instance(&request).await,
812            "StopDBInstance" => self.stop_db_instance(&request).await,
813            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
814            "RestoreDBInstanceFromDBSnapshot" => {
815                self.restore_db_instance_from_db_snapshot(&request).await
816            }
817            "RestoreDBInstanceToPointInTime" => {
818                self.restore_db_instance_to_point_in_time(&request).await
819            }
820            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
821            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
822            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
823            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
824            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
825            "RestoreDBClusterToPointInTime" => {
826                self.restore_db_cluster_to_point_in_time(&request).await
827            }
828            _ => self.handle_extra_action(&request),
829        };
830        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
831            self.save_snapshot().await;
832        }
833        result
834    }
835
836    fn supported_actions(&self) -> &[&str] {
837        SUPPORTED_ACTIONS
838    }
839}
840
841impl RdsService {}
842
843/// Render a single user-set parameter as the XML shape AWS emits inside
844/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
845/// alongside user values so we report `dynamic`/`string` defaults.
846pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
847    format!(
848        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
849        xml_escape(name),
850        xml_escape(value),
851    )
852}
853
854/// Render a single engine-default parameter as the XML shape AWS emits
855/// inside `DescribeDB(Cluster)Parameters` and
856/// `DescribeEngineDefault(Cluster)Parameters` responses.
857pub(crate) fn render_engine_default_parameter_xml(
858    default: &crate::state::EngineDefaultParameter,
859) -> String {
860    format!(
861        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
862        xml_escape(default.name),
863        xml_escape(default.value),
864        xml_escape(default.apply_type),
865        xml_escape(default.data_type),
866        xml_escape(default.allowed_values),
867        default.is_modifiable,
868    )
869}
870
871/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
872/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
873/// (the `Parameter` list location name from the Smithy model); we also
874/// accept the generic `Parameters.member.N` form so hand-built clients
875/// using the default Query list shape keep working. Skips members
876/// missing a name or value; ApplyMethod is accepted but ignored
877/// (single-state model).
878pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
879    let mut out = Vec::new();
880    for prefix in ["Parameters.Parameter", "Parameters.member"] {
881        let mut index = 1;
882        loop {
883            let name_key = format!("{prefix}.{index}.ParameterName");
884            let value_key = format!("{prefix}.{index}.ParameterValue");
885            let name = optional_query_param(request, &name_key);
886            let value = optional_query_param(request, &value_key);
887            if name.is_none() && value.is_none() {
888                break;
889            }
890            if let (Some(n), Some(v)) = (name, value) {
891                if !n.is_empty() {
892                    out.push((n, v));
893                }
894            }
895            index += 1;
896        }
897    }
898    out
899}
900
901/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
902/// the absolute path inside the running container. Unknown names fall
903/// through as-is so callers can also fetch arbitrary paths.
904fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
905    match (engine, log_file_name) {
906        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
907        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
908        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
909        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
910            "/var/log/mysql/slow.log".to_string()
911        }
912        _ => log_file_name.to_string(),
913    }
914}
915
916pub(crate) struct PaginationResult<T> {
917    items: Vec<T>,
918    next_marker: Option<String>,
919}
920
921/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
922/// promoting it to writer when the cluster has none. Idempotent:
923/// re-attaching an existing member is a no-op.
924fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
925    use serde_json::{json, Value};
926    let Some(map) = state.extras.get_mut("clusters") else {
927        return;
928    };
929    let Some(entry) = map.get_mut(cluster_id) else {
930        return;
931    };
932    let Some(obj) = entry.as_object_mut() else {
933        return;
934    };
935    let mut members: Vec<Value> = obj
936        .get("DBClusterMembers")
937        .and_then(|v| v.as_array())
938        .cloned()
939        .unwrap_or_default();
940    if members
941        .iter()
942        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
943    {
944        return;
945    }
946    let has_writer = members
947        .iter()
948        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
949    let promotion_tier = (members.len() as i64) + 1;
950    members.push(json!({
951        "DBInstanceIdentifier": instance_id,
952        "IsClusterWriter": !has_writer,
953        "DBClusterParameterGroupStatus": "in-sync",
954        "PromotionTier": promotion_tier,
955    }));
956    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
957    if !has_writer {
958        obj.insert(
959            "WriterDBInstanceIdentifier".to_string(),
960            Value::String(instance_id.to_string()),
961        );
962    }
963}
964
965#[path = "../service_helpers.rs"]
966mod service_helpers;
967pub(crate) use service_helpers::*;
968
969#[cfg(test)]
970#[path = "../service_tests.rs"]
971mod tests;