Skip to main content

fakecloud_rds/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeServerlessV2PlatformVersions",
125    "DescribeSourceRegions",
126    "DescribeTenantDatabases",
127    "DescribeValidDBInstanceModifications",
128    "DisableHttpEndpoint",
129    "DownloadDBLogFilePortion",
130    "EnableHttpEndpoint",
131    "FailoverDBCluster",
132    "FailoverGlobalCluster",
133    "ListTagsForResource",
134    "ModifyActivityStream",
135    "ModifyCertificates",
136    "ModifyCurrentDBClusterCapacity",
137    "ModifyCustomDBEngineVersion",
138    "ModifyDBCluster",
139    "ModifyDBClusterEndpoint",
140    "ModifyDBClusterParameterGroup",
141    "ModifyDBClusterSnapshotAttribute",
142    "ModifyDBInstance",
143    "ModifyDBParameterGroup",
144    "ModifyDBProxy",
145    "ModifyDBProxyEndpoint",
146    "ModifyDBProxyTargetGroup",
147    "ModifyDBRecommendation",
148    "ModifyDBShardGroup",
149    "ModifyDBSnapshot",
150    "ModifyDBSnapshotAttribute",
151    "ModifyDBSubnetGroup",
152    "ModifyEventSubscription",
153    "ModifyGlobalCluster",
154    "ModifyIntegration",
155    "ModifyOptionGroup",
156    "ModifyTenantDatabase",
157    "PromoteReadReplica",
158    "PromoteReadReplicaDBCluster",
159    "PurchaseReservedDBInstancesOffering",
160    "RebootDBCluster",
161    "RebootDBInstance",
162    "RebootDBShardGroup",
163    "RegisterDBProxyTargets",
164    "RemoveFromGlobalCluster",
165    "RemoveRoleFromDBCluster",
166    "RemoveRoleFromDBInstance",
167    "RemoveSourceIdentifierFromSubscription",
168    "RemoveTagsFromResource",
169    "ResetDBClusterParameterGroup",
170    "ResetDBParameterGroup",
171    "RestoreDBClusterFromS3",
172    "RestoreDBClusterFromSnapshot",
173    "RestoreDBClusterToPointInTime",
174    "RestoreDBInstanceFromDBSnapshot",
175    "RestoreDBInstanceFromS3",
176    "RestoreDBInstanceToPointInTime",
177    "RevokeDBSecurityGroupIngress",
178    "StartActivityStream",
179    "StartDBCluster",
180    "StartDBInstance",
181    "StartDBInstanceAutomatedBackupsReplication",
182    "StartExportTask",
183    "StopActivityStream",
184    "StopDBCluster",
185    "StopDBInstance",
186    "StopDBInstanceAutomatedBackupsReplication",
187    "SwitchoverBlueGreenDeployment",
188    "SwitchoverGlobalCluster",
189    "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193    pub(crate) state: SharedRdsState,
194    runtime: Option<Arc<RdsRuntime>>,
195    snapshot_store: Option<Arc<dyn SnapshotStore>>,
196    snapshot_lock: Arc<AsyncMutex<()>>,
197    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
201#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204    DbInstance,
205    DbSnapshot,
206    DbParameterGroup,
207    DbCluster,
208    DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
213    /// form AWS publishes in the `aws.rds` event detail.
214    fn as_str(self) -> &'static str {
215        match self {
216            Self::DbInstance => "DB_INSTANCE",
217            Self::DbSnapshot => "DB_SNAPSHOT",
218            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219            Self::DbCluster => "DB_CLUSTER",
220            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221        }
222    }
223
224    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
225    /// API spec this is the kebab-case form (`db-instance`,
226    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
227    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
228    pub(crate) fn describe_events_str(self) -> &'static str {
229        match self {
230            Self::DbInstance => "db-instance",
231            Self::DbSnapshot => "db-snapshot",
232            Self::DbParameterGroup => "db-parameter-group",
233            Self::DbCluster => "db-cluster",
234            Self::DbClusterSnapshot => "db-cluster-snapshot",
235        }
236    }
237
238    fn detail_type(self) -> &'static str {
239        match self {
240            Self::DbInstance => "RDS DB Instance Event",
241            Self::DbSnapshot => "RDS DB Snapshot Event",
242            Self::DbParameterGroup => "RDS DB Parameter Group Event",
243            Self::DbCluster => "RDS DB Cluster Event",
244            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245        }
246    }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261    pub(crate) fn state_handle(&self) -> &SharedRdsState {
262        &self.state
263    }
264}
265
266impl RdsService {
267    pub fn new(state: SharedRdsState) -> Self {
268        Self {
269            state,
270            runtime: None,
271            snapshot_store: None,
272            snapshot_lock: Arc::new(AsyncMutex::new(())),
273            delivery_bus: None,
274        }
275    }
276
277    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278        self.runtime = Some(runtime);
279        self
280    }
281
282    /// Crate-internal accessor for the optional runtime; needed by the
283    /// extras handler so cluster snapshot/restore paths can dump and
284    /// replay member databases via the live container runtime.
285    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286        self.runtime.as_ref()
287    }
288
289    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290        self.snapshot_store = Some(store);
291        self
292    }
293
294    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295        self.delivery_bus = Some(bus);
296        self
297    }
298
299    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
300    /// Also records into the per-account events ring so DescribeEvents
301    /// can serve the row. No-op for the EventBridge side when the bus
302    /// isn't wired (tests, minimal configs).
303    pub(crate) fn emit_event(
304        &self,
305        source_type: RdsSourceType,
306        source_identifier: &str,
307        source_arn: &str,
308        event_id: &str,
309        event_categories: &[&str],
310        message: &str,
311    ) {
312        // Source the account_id off the source_arn (segment 4) — that's
313        // the canonical ARN form for RDS resources.
314        let account_id = source_arn.split(':').nth(4).unwrap_or("");
315        emit_event_static_with_state(
316            self.delivery_bus.as_ref(),
317            Some(&self.state),
318            if account_id.is_empty() {
319                None
320            } else {
321                Some(account_id)
322            },
323            source_type,
324            source_identifier,
325            source_arn,
326            event_id,
327            event_categories,
328            message,
329        );
330    }
331
332    async fn save_snapshot(&self) {
333        save_snapshot_static(
334            self.state.clone(),
335            self.snapshot_store.clone(),
336            self.snapshot_lock.clone(),
337        )
338        .await;
339    }
340
341    /// Recreate the backing Docker/Podman containers for persisted DB
342    /// instances after a fakecloud restart. Without this, persistent
343    /// mode loads the row back into memory with `db_instance_status =
344    /// available` but the container is gone, so the endpoint is dead
345    /// (issue #1338). Each candidate is flipped to `starting`
346    /// synchronously, then a background task brings the container back
347    /// and flips it back to `available`. Instances persisted as
348    /// `stopped` are skipped — `StartDBInstance` revives those.
349    pub async fn recover_persisted_containers(&self) {
350        let Some(runtime) = self.runtime.clone() else {
351            return;
352        };
353
354        struct Pending {
355            account_id: String,
356            region: String,
357            id: String,
358            arn: String,
359            engine: String,
360            engine_version: String,
361            username: String,
362            password: String,
363            db_name: String,
364        }
365
366        let pending: Vec<Pending> = {
367            let mut accounts = self.state.write();
368            let mut out = Vec::new();
369            for (_, state) in accounts.iter_mut() {
370                let account_id = state.account_id.clone();
371                let region = state.region.clone();
372                for (id, inst) in state.instances.iter_mut() {
373                    if !matches!(
374                        inst.db_instance_status.as_str(),
375                        "available" | "starting" | "modifying" | "rebooting" | "backing-up"
376                    ) {
377                        continue;
378                    }
379                    inst.db_instance_status = "starting".to_string();
380                    out.push(Pending {
381                        account_id: account_id.clone(),
382                        region: region.clone(),
383                        id: id.clone(),
384                        arn: inst.db_instance_arn.clone(),
385                        engine: inst.engine.clone(),
386                        engine_version: inst.engine_version.clone(),
387                        username: inst.master_username.clone(),
388                        password: inst.master_user_password.clone(),
389                        db_name: inst
390                            .db_name
391                            .clone()
392                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
393                    });
394                }
395            }
396            out
397        };
398
399        if pending.is_empty() {
400            return;
401        }
402        tracing::info!(
403            count = pending.len(),
404            "recovering backing containers for persisted rds instances",
405        );
406
407        for p in pending {
408            let runtime = runtime.clone();
409            let state = self.state.clone();
410            let snapshot_store = self.snapshot_store.clone();
411            let snapshot_lock = self.snapshot_lock.clone();
412            let delivery_bus = self.delivery_bus.clone();
413            tokio::spawn(async move {
414                match runtime
415                    .ensure_postgres(
416                        &p.id,
417                        &p.engine,
418                        &p.engine_version,
419                        &p.username,
420                        &p.password,
421                        &p.db_name,
422                        &p.account_id,
423                        &p.region,
424                    )
425                    .await
426                {
427                    Ok(running) => {
428                        {
429                            let mut accounts = state.write();
430                            if let Some(s) = accounts.get_mut(&p.account_id) {
431                                if let Some(inst) = s.instances.get_mut(&p.id) {
432                                    inst.db_instance_status = "available".to_string();
433                                    inst.endpoint_address = running.endpoint_address.clone();
434                                    inst.port = i32::from(running.endpoint_port);
435                                    inst.host_port = running.host_port;
436                                    inst.container_id = running.container_id;
437                                }
438                            }
439                        }
440                        save_snapshot_static(
441                            state.clone(),
442                            snapshot_store.clone(),
443                            snapshot_lock.clone(),
444                        )
445                        .await;
446                        emit_event_static(
447                            delivery_bus.as_ref(),
448                            RdsSourceType::DbInstance,
449                            &p.id,
450                            &p.arn,
451                            "RDS-EVENT-0088",
452                            &["notification"],
453                            "DB instance restarted after fakecloud restart",
454                        );
455                    }
456                    Err(error) => {
457                        tracing::error!(
458                            %error,
459                            db_instance_identifier = %p.id,
460                            "failed to recover rds backing container after restart",
461                        );
462                        {
463                            let mut accounts = state.write();
464                            if let Some(s) = accounts.get_mut(&p.account_id) {
465                                if let Some(inst) = s.instances.get_mut(&p.id) {
466                                    inst.db_instance_status = "failed".to_string();
467                                }
468                            }
469                        }
470                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
471                    }
472                }
473            });
474        }
475    }
476
477    /// Stop the backing container for `db_instance_identifier` and mark
478    /// the row `stopped`. Synchronous wrt the runtime so callers see a
479    /// real `stopped` status by the time the response goes back; mirrors
480    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
481    /// the visible status immediately.
482    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
483        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
484
485        // Look up the instance first so a missing identifier returns the
486        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
487        // missing container runtime. Conformance probes hit Start/Stop with
488        // synthetic identifiers and expect the documented error shape.
489        let arn = {
490            let accounts = self.state.read();
491            let empty = RdsState::new(&request.account_id, &request.region);
492            let state = accounts.get(&request.account_id).unwrap_or(&empty);
493            state
494                .instances
495                .get(&db_instance_identifier)
496                .map(|i| i.db_instance_arn.clone())
497                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
498        };
499
500        if let Some(runtime) = self.runtime.as_ref() {
501            runtime.stop_container(&db_instance_identifier).await;
502        }
503
504        let instance = {
505            let mut accounts = self.state.write();
506            let state = accounts.get_or_create(&request.account_id);
507            let inst = state
508                .instances
509                .get_mut(&db_instance_identifier)
510                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
511            inst.db_instance_status = "stopped".to_string();
512            inst.container_id = String::new();
513            inst.clone()
514        };
515
516        self.emit_event(
517            RdsSourceType::DbInstance,
518            &db_instance_identifier,
519            &arn,
520            "RDS-EVENT-0089",
521            &["notification"],
522            "DB instance stopped",
523        );
524
525        Ok(AwsResponse::xml(
526            StatusCode::OK,
527            query_response_xml(
528                "StopDBInstance",
529                RDS_NS,
530                &format!(
531                    "<DBInstance>{}</DBInstance>",
532                    db_instance_xml(&instance, Some("stopped"))
533                ),
534                &request.request_id,
535            ),
536        ))
537    }
538
539    /// Restart a stopped DB instance: spin up the backing container and
540    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
541    async fn start_db_instance(
542        &self,
543        request: &AwsRequest,
544    ) -> Result<AwsResponse, AwsServiceError> {
545        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
546
547        // Look up the instance first so a missing identifier returns the
548        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
549        // missing container runtime.
550        let instance = {
551            let accounts = self.state.read();
552            let empty = RdsState::new(&request.account_id, &request.region);
553            let state = accounts.get(&request.account_id).unwrap_or(&empty);
554            state
555                .instances
556                .get(&db_instance_identifier)
557                .cloned()
558                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
559        };
560
561        // Flip to `starting` so concurrent DescribeDBInstances callers
562        // see the in-flight state.
563        {
564            let mut accounts = self.state.write();
565            let state = accounts.get_or_create(&request.account_id);
566            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
567                inst.db_instance_status = "starting".to_string();
568            }
569        }
570
571        let running = if let Some(runtime) = self.runtime.as_ref() {
572            match runtime
573                .ensure_postgres(
574                    &db_instance_identifier,
575                    &instance.engine,
576                    &instance.engine_version,
577                    &instance.master_username,
578                    &instance.master_user_password,
579                    instance
580                        .db_name
581                        .as_deref()
582                        .unwrap_or(default_db_name(&instance.engine)),
583                    &request.account_id,
584                    &request.region,
585                )
586                .await
587            {
588                Ok(r) => Some(r),
589                Err(e) => {
590                    // Roll the row back to `stopped` so the next
591                    // Describe doesn't report a permanently-`starting`
592                    // instance with no container behind it.
593                    let mut accounts = self.state.write();
594                    let state = accounts.get_or_create(&request.account_id);
595                    if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
596                        inst.db_instance_status = "stopped".to_string();
597                    }
598                    return Err(runtime_error_to_service_error(e));
599                }
600            }
601        } else {
602            // No container runtime configured: StartDBInstance should
603            // fail rather than report success against a backend that
604            // does not exist. Roll the row back so subsequent calls
605            // don't see a stuck `starting` state.
606            {
607                let mut accounts = self.state.write();
608                let state = accounts.get_or_create(&request.account_id);
609                if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
610                    inst.db_instance_status = "stopped".to_string();
611                }
612            }
613            return Err(AwsServiceError::aws_error(
614                StatusCode::SERVICE_UNAVAILABLE,
615                "InternalFailure",
616                "Container runtime is not configured; cannot start DB instance",
617            ));
618        };
619
620        let instance = {
621            let mut accounts = self.state.write();
622            let state = accounts.get_or_create(&request.account_id);
623            let inst = state
624                .instances
625                .get_mut(&db_instance_identifier)
626                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
627            inst.db_instance_status = "available".to_string();
628            inst.endpoint_address = "127.0.0.1".to_string();
629            if let Some(r) = running {
630                inst.endpoint_address = r.endpoint_address.clone();
631                inst.port = i32::from(r.endpoint_port);
632                inst.host_port = r.host_port;
633                inst.container_id = r.container_id;
634            }
635            inst.clone()
636        };
637
638        self.emit_event(
639            RdsSourceType::DbInstance,
640            &db_instance_identifier,
641            &instance.db_instance_arn,
642            "RDS-EVENT-0088",
643            &["notification"],
644            "DB instance started",
645        );
646
647        Ok(AwsResponse::xml(
648            StatusCode::OK,
649            query_response_xml(
650                "StartDBInstance",
651                RDS_NS,
652                &format!(
653                    "<DBInstance>{}</DBInstance>",
654                    db_instance_xml(&instance, None)
655                ),
656                &request.request_id,
657            ),
658        ))
659    }
660}
661
662/// Persist the current `RdsState` to the configured snapshot store. Free
663/// function so background tasks (e.g. the create-DB-instance container-start
664/// task) can save without holding a `&RdsService`. Returns immediately when
665/// Whether the given AWS error code is in the AddTagsToResource Smithy
666/// model's declared error set. Used so undeclared `*NotFound` codes
667/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
668/// swallowed into a no-op rather than surfaced as a non-modelled error.
669fn is_declared_add_tags_not_found(code: &str) -> bool {
670    matches!(
671        code,
672        "BlueGreenDeploymentNotFoundFault"
673            | "DBClusterNotFoundFault"
674            | "DBInstanceNotFound"
675            | "DBProxyEndpointNotFoundFault"
676            | "DBProxyNotFoundFault"
677            | "DBProxyTargetGroupNotFoundFault"
678            | "DBShardGroupNotFound"
679            | "DBSnapshotNotFound"
680            | "DBSnapshotTenantDatabaseNotFoundFault"
681            | "IntegrationNotFoundFault"
682            | "InvalidDBClusterEndpointStateFault"
683            | "InvalidDBClusterStateFault"
684            | "InvalidDBInstanceState"
685            | "TenantDatabaseNotFound"
686    )
687}
688
689/// no store is configured (memory-mode runs).
690async fn save_snapshot_static(
691    state: SharedRdsState,
692    store: Option<Arc<dyn SnapshotStore>>,
693    lock: Arc<AsyncMutex<()>>,
694) {
695    let Some(store) = store else {
696        return;
697    };
698    let _guard = lock.lock().await;
699    let snapshot = RdsSnapshot {
700        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
701        state: None,
702        accounts: Some(state.read().clone()),
703    };
704    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
705        let bytes = serde_json::to_vec(&snapshot)
706            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
707        store.save(&bytes)
708    })
709    .await;
710    match join {
711        Ok(Ok(())) => {}
712        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
713        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
714    }
715}
716
717impl RdsService {
718    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
719    ///
720    /// RDS operations that start, stop, or reach into a database container fail
721    /// with a consistent wire error when the daemon (Docker/Podman) is missing
722    /// rather than each caller restating the message.
723    /// Resolve the container runtime or return a declared error shape.
724    /// `InsufficientDBInstanceCapacity` is declared on every op that
725    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
726    /// Read Replica), and is the closest Smithy-modelled analogue for
727    /// "fakecloud can't satisfy this DB request right now".
728    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
729        self.runtime.as_ref().ok_or_else(|| {
730            AwsServiceError::aws_error(
731                StatusCode::SERVICE_UNAVAILABLE,
732                "InsufficientDBInstanceCapacity",
733                "Docker/Podman is required for RDS DB instances but is not available",
734            )
735        })
736    }
737}
738
739#[async_trait]
740impl AwsService for RdsService {
741    fn service_name(&self) -> &str {
742        "rds"
743    }
744
745    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
746        // Centralized Smithy-aligned validation. Returns the appropriate
747        // `MissingParameter` / `InvalidParameterValue` error before the
748        // per-action handler runs. Actions without a constraint entry
749        // fall through unchanged.
750        crate::validation::prevalidate(request.action.as_str(), &request)?;
751
752        let mutates = is_mutating_action(request.action.as_str());
753        let result = match request.action.as_str() {
754            "AddTagsToResource" => self.add_tags_to_resource(&request),
755            "CreateDBInstance" => self.create_db_instance(&request).await,
756            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
757            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
758            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
759            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
760            "DeleteDBInstance" => self.delete_db_instance(&request).await,
761            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
762            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
763            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
764            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
765            "DescribeDBInstances" => self.describe_db_instances(&request),
766            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
767            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
768            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
769            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
770            "DescribeOrderableDBInstanceOptions" => {
771                self.describe_orderable_db_instance_options(&request)
772            }
773            "ListTagsForResource" => self.list_tags_for_resource(&request),
774            "ModifyDBInstance" => self.modify_db_instance(&request),
775            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
776            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
777            "RebootDBInstance" => self.reboot_db_instance(&request).await,
778            "StartDBInstance" => self.start_db_instance(&request).await,
779            "StopDBInstance" => self.stop_db_instance(&request).await,
780            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
781            "RestoreDBInstanceFromDBSnapshot" => {
782                self.restore_db_instance_from_db_snapshot(&request).await
783            }
784            "RestoreDBInstanceToPointInTime" => {
785                self.restore_db_instance_to_point_in_time(&request).await
786            }
787            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
788            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
789            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
790            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
791            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
792            "RestoreDBClusterToPointInTime" => {
793                self.restore_db_cluster_to_point_in_time(&request).await
794            }
795            _ => self.handle_extra_action(&request),
796        };
797        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
798            self.save_snapshot().await;
799        }
800        result
801    }
802
803    fn supported_actions(&self) -> &[&str] {
804        SUPPORTED_ACTIONS
805    }
806}
807
808impl RdsService {}
809
810/// Render a single user-set parameter as the XML shape AWS emits inside
811/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
812/// alongside user values so we report `dynamic`/`string` defaults.
813pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
814    format!(
815        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
816        xml_escape(name),
817        xml_escape(value),
818    )
819}
820
821/// Render a single engine-default parameter as the XML shape AWS emits
822/// inside `DescribeDB(Cluster)Parameters` and
823/// `DescribeEngineDefault(Cluster)Parameters` responses.
824pub(crate) fn render_engine_default_parameter_xml(
825    default: &crate::state::EngineDefaultParameter,
826) -> String {
827    format!(
828        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
829        xml_escape(default.name),
830        xml_escape(default.value),
831        xml_escape(default.apply_type),
832        xml_escape(default.data_type),
833        xml_escape(default.allowed_values),
834        default.is_modifiable,
835    )
836}
837
838/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
839/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
840/// (the `Parameter` list location name from the Smithy model); we also
841/// accept the generic `Parameters.member.N` form so hand-built clients
842/// using the default Query list shape keep working. Skips members
843/// missing a name or value; ApplyMethod is accepted but ignored
844/// (single-state model).
845pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
846    let mut out = Vec::new();
847    for prefix in ["Parameters.Parameter", "Parameters.member"] {
848        let mut index = 1;
849        loop {
850            let name_key = format!("{prefix}.{index}.ParameterName");
851            let value_key = format!("{prefix}.{index}.ParameterValue");
852            let name = optional_query_param(request, &name_key);
853            let value = optional_query_param(request, &value_key);
854            if name.is_none() && value.is_none() {
855                break;
856            }
857            if let (Some(n), Some(v)) = (name, value) {
858                if !n.is_empty() {
859                    out.push((n, v));
860                }
861            }
862            index += 1;
863        }
864    }
865    out
866}
867
868/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
869/// the absolute path inside the running container. Unknown names fall
870/// through as-is so callers can also fetch arbitrary paths.
871fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
872    match (engine, log_file_name) {
873        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
874        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
875        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
876        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
877            "/var/log/mysql/slow.log".to_string()
878        }
879        _ => log_file_name.to_string(),
880    }
881}
882
883pub(crate) struct PaginationResult<T> {
884    items: Vec<T>,
885    next_marker: Option<String>,
886}
887
888/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
889/// promoting it to writer when the cluster has none. Idempotent:
890/// re-attaching an existing member is a no-op.
891fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
892    use serde_json::{json, Value};
893    let Some(map) = state.extras.get_mut("clusters") else {
894        return;
895    };
896    let Some(entry) = map.get_mut(cluster_id) else {
897        return;
898    };
899    let Some(obj) = entry.as_object_mut() else {
900        return;
901    };
902    let mut members: Vec<Value> = obj
903        .get("DBClusterMembers")
904        .and_then(|v| v.as_array())
905        .cloned()
906        .unwrap_or_default();
907    if members
908        .iter()
909        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
910    {
911        return;
912    }
913    let has_writer = members
914        .iter()
915        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
916    let promotion_tier = (members.len() as i64) + 1;
917    members.push(json!({
918        "DBInstanceIdentifier": instance_id,
919        "IsClusterWriter": !has_writer,
920        "DBClusterParameterGroupStatus": "in-sync",
921        "PromotionTier": promotion_tier,
922    }));
923    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
924    if !has_writer {
925        obj.insert(
926            "WriterDBInstanceIdentifier".to_string(),
927            Value::String(instance_id.to_string()),
928        );
929    }
930}
931
932#[path = "../service_helpers.rs"]
933mod service_helpers;
934pub(crate) use service_helpers::*;
935
936#[cfg(test)]
937#[path = "../service_tests.rs"]
938mod tests;