Skip to main content

fakecloud_rds/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeServerlessV2PlatformVersions",
125    "DescribeSourceRegions",
126    "DescribeTenantDatabases",
127    "DescribeValidDBInstanceModifications",
128    "DisableHttpEndpoint",
129    "DownloadDBLogFilePortion",
130    "EnableHttpEndpoint",
131    "FailoverDBCluster",
132    "FailoverGlobalCluster",
133    "ListTagsForResource",
134    "ModifyActivityStream",
135    "ModifyCertificates",
136    "ModifyCurrentDBClusterCapacity",
137    "ModifyCustomDBEngineVersion",
138    "ModifyDBCluster",
139    "ModifyDBClusterEndpoint",
140    "ModifyDBClusterParameterGroup",
141    "ModifyDBClusterSnapshotAttribute",
142    "ModifyDBInstance",
143    "ModifyDBParameterGroup",
144    "ModifyDBProxy",
145    "ModifyDBProxyEndpoint",
146    "ModifyDBProxyTargetGroup",
147    "ModifyDBRecommendation",
148    "ModifyDBShardGroup",
149    "ModifyDBSnapshot",
150    "ModifyDBSnapshotAttribute",
151    "ModifyDBSubnetGroup",
152    "ModifyEventSubscription",
153    "ModifyGlobalCluster",
154    "ModifyIntegration",
155    "ModifyOptionGroup",
156    "ModifyTenantDatabase",
157    "PromoteReadReplica",
158    "PromoteReadReplicaDBCluster",
159    "PurchaseReservedDBInstancesOffering",
160    "RebootDBCluster",
161    "RebootDBInstance",
162    "RebootDBShardGroup",
163    "RegisterDBProxyTargets",
164    "RemoveFromGlobalCluster",
165    "RemoveRoleFromDBCluster",
166    "RemoveRoleFromDBInstance",
167    "RemoveSourceIdentifierFromSubscription",
168    "RemoveTagsFromResource",
169    "ResetDBClusterParameterGroup",
170    "ResetDBParameterGroup",
171    "RestoreDBClusterFromS3",
172    "RestoreDBClusterFromSnapshot",
173    "RestoreDBClusterToPointInTime",
174    "RestoreDBInstanceFromDBSnapshot",
175    "RestoreDBInstanceFromS3",
176    "RestoreDBInstanceToPointInTime",
177    "RevokeDBSecurityGroupIngress",
178    "StartActivityStream",
179    "StartDBCluster",
180    "StartDBInstance",
181    "StartDBInstanceAutomatedBackupsReplication",
182    "StartExportTask",
183    "StopActivityStream",
184    "StopDBCluster",
185    "StopDBInstance",
186    "StopDBInstanceAutomatedBackupsReplication",
187    "SwitchoverBlueGreenDeployment",
188    "SwitchoverGlobalCluster",
189    "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193    pub(crate) state: SharedRdsState,
194    runtime: Option<Arc<RdsRuntime>>,
195    snapshot_store: Option<Arc<dyn SnapshotStore>>,
196    snapshot_lock: Arc<AsyncMutex<()>>,
197    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
201#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204    DbInstance,
205    DbSnapshot,
206    DbParameterGroup,
207    DbCluster,
208    DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
213    /// form AWS publishes in the `aws.rds` event detail.
214    fn as_str(self) -> &'static str {
215        match self {
216            Self::DbInstance => "DB_INSTANCE",
217            Self::DbSnapshot => "DB_SNAPSHOT",
218            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219            Self::DbCluster => "DB_CLUSTER",
220            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221        }
222    }
223
224    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
225    /// API spec this is the kebab-case form (`db-instance`,
226    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
227    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
228    pub(crate) fn describe_events_str(self) -> &'static str {
229        match self {
230            Self::DbInstance => "db-instance",
231            Self::DbSnapshot => "db-snapshot",
232            Self::DbParameterGroup => "db-parameter-group",
233            Self::DbCluster => "db-cluster",
234            Self::DbClusterSnapshot => "db-cluster-snapshot",
235        }
236    }
237
238    fn detail_type(self) -> &'static str {
239        match self {
240            Self::DbInstance => "RDS DB Instance Event",
241            Self::DbSnapshot => "RDS DB Snapshot Event",
242            Self::DbParameterGroup => "RDS DB Parameter Group Event",
243            Self::DbCluster => "RDS DB Cluster Event",
244            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245        }
246    }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261    pub(crate) fn state_handle(&self) -> &SharedRdsState {
262        &self.state
263    }
264}
265
266impl RdsService {
267    pub fn new(state: SharedRdsState) -> Self {
268        Self {
269            state,
270            runtime: None,
271            snapshot_store: None,
272            snapshot_lock: Arc::new(AsyncMutex::new(())),
273            delivery_bus: None,
274        }
275    }
276
277    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278        self.runtime = Some(runtime);
279        self
280    }
281
282    /// Crate-internal accessor for the optional runtime; needed by the
283    /// extras handler so cluster snapshot/restore paths can dump and
284    /// replay member databases via the live container runtime.
285    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286        self.runtime.as_ref()
287    }
288
289    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290        self.snapshot_store = Some(store);
291        self
292    }
293
294    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295        self.delivery_bus = Some(bus);
296        self
297    }
298
299    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
300    /// Also records into the per-account events ring so DescribeEvents
301    /// can serve the row. No-op for the EventBridge side when the bus
302    /// isn't wired (tests, minimal configs).
303    pub(crate) fn emit_event(
304        &self,
305        source_type: RdsSourceType,
306        source_identifier: &str,
307        source_arn: &str,
308        event_id: &str,
309        event_categories: &[&str],
310        message: &str,
311    ) {
312        // Source the account_id off the source_arn (segment 4) — that's
313        // the canonical ARN form for RDS resources.
314        let account_id = source_arn.split(':').nth(4).unwrap_or("");
315        emit_event_static_with_state(
316            self.delivery_bus.as_ref(),
317            Some(&self.state),
318            if account_id.is_empty() {
319                None
320            } else {
321                Some(account_id)
322            },
323            source_type,
324            source_identifier,
325            source_arn,
326            event_id,
327            event_categories,
328            message,
329        );
330    }
331
332    async fn save_snapshot(&self) {
333        save_snapshot_static(
334            self.state.clone(),
335            self.snapshot_store.clone(),
336            self.snapshot_lock.clone(),
337        )
338        .await;
339    }
340
341    /// Recreate the backing Docker/Podman containers for persisted DB
342    /// instances after a fakecloud restart. Without this, persistent
343    /// mode loads the row back into memory with `db_instance_status =
344    /// available` but the container is gone, so the endpoint is dead
345    /// (issue #1338). Each candidate is flipped to `starting`
346    /// synchronously, then a background task brings the container back
347    /// and flips it back to `available`. Instances persisted as
348    /// `stopped` are skipped — `StartDBInstance` revives those.
349    pub async fn recover_persisted_containers(&self) {
350        let Some(runtime) = self.runtime.clone() else {
351            return;
352        };
353
354        struct Pending {
355            account_id: String,
356            region: String,
357            id: String,
358            arn: String,
359            engine: String,
360            engine_version: String,
361            username: String,
362            password: String,
363            db_name: String,
364            tags: Vec<crate::state::RdsTag>,
365        }
366
367        let pending: Vec<Pending> = {
368            let mut accounts = self.state.write();
369            let mut out = Vec::new();
370            for (_, state) in accounts.iter_mut() {
371                let account_id = state.account_id.clone();
372                let region = state.region.clone();
373                for (id, inst) in state.instances.iter_mut() {
374                    // "creating" is included so an instance whose background
375                    // create task hadn't finished (and re-saved it as
376                    // "available") when the process crashed is resumed rather
377                    // than silently dropped on restart — the API already
378                    // returned it to the client, so DescribeDBInstances must
379                    // not lose it (bug-hunt 2026-06-13, finding 4.3). Recovery
380                    // re-drives it through `ensure_*` to a live container.
381                    if !matches!(
382                        inst.db_instance_status.as_str(),
383                        "creating"
384                            | "available"
385                            | "starting"
386                            | "modifying"
387                            | "rebooting"
388                            | "backing-up"
389                    ) {
390                        continue;
391                    }
392                    inst.db_instance_status = "starting".to_string();
393                    out.push(Pending {
394                        account_id: account_id.clone(),
395                        region: region.clone(),
396                        id: id.clone(),
397                        arn: inst.db_instance_arn.clone(),
398                        engine: inst.engine.clone(),
399                        engine_version: inst.engine_version.clone(),
400                        username: inst.master_username.clone(),
401                        password: inst.master_user_password.clone(),
402                        db_name: inst
403                            .db_name
404                            .clone()
405                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406                        tags: inst.tags.clone(),
407                    });
408                }
409            }
410            out
411        };
412
413        if pending.is_empty() {
414            return;
415        }
416        tracing::info!(
417            count = pending.len(),
418            "recovering backing containers for persisted rds instances",
419        );
420
421        for p in pending {
422            let runtime = runtime.clone();
423            let state = self.state.clone();
424            let snapshot_store = self.snapshot_store.clone();
425            let snapshot_lock = self.snapshot_lock.clone();
426            let delivery_bus = self.delivery_bus.clone();
427            tokio::spawn(async move {
428                match runtime
429                    .ensure_postgres(
430                        &p.id,
431                        &p.engine,
432                        &p.engine_version,
433                        &p.username,
434                        &p.password,
435                        &p.db_name,
436                        &p.account_id,
437                        &p.region,
438                        &p.tags,
439                    )
440                    .await
441                {
442                    Ok(running) => {
443                        {
444                            let mut accounts = state.write();
445                            if let Some(s) = accounts.get_mut(&p.account_id) {
446                                if let Some(inst) = s.instances.get_mut(&p.id) {
447                                    inst.db_instance_status = "available".to_string();
448                                    inst.endpoint_address = running.endpoint_address.clone();
449                                    inst.port = i32::from(running.endpoint_port);
450                                    inst.host_port = running.host_port;
451                                    inst.container_id = running.container_id;
452                                }
453                            }
454                        }
455                        save_snapshot_static(
456                            state.clone(),
457                            snapshot_store.clone(),
458                            snapshot_lock.clone(),
459                        )
460                        .await;
461                        emit_event_static(
462                            delivery_bus.as_ref(),
463                            RdsSourceType::DbInstance,
464                            &p.id,
465                            &p.arn,
466                            "RDS-EVENT-0088",
467                            &["notification"],
468                            "DB instance restarted after fakecloud restart",
469                        );
470                    }
471                    Err(error) => {
472                        tracing::error!(
473                            %error,
474                            db_instance_identifier = %p.id,
475                            "failed to recover rds backing container after restart",
476                        );
477                        {
478                            let mut accounts = state.write();
479                            if let Some(s) = accounts.get_mut(&p.account_id) {
480                                if let Some(inst) = s.instances.get_mut(&p.id) {
481                                    inst.db_instance_status = "failed".to_string();
482                                }
483                            }
484                        }
485                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486                    }
487                }
488            });
489        }
490    }
491
492    /// Stop the backing container for `db_instance_identifier` and mark
493    /// the row `stopped`. Synchronous wrt the runtime so callers see a
494    /// real `stopped` status by the time the response goes back; mirrors
495    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
496    /// the visible status immediately.
497    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500        // Look up the instance first so a missing identifier returns the
501        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
502        // missing container runtime. Conformance probes hit Start/Stop with
503        // synthetic identifiers and expect the documented error shape.
504        let arn = {
505            let accounts = self.state.read();
506            let empty = RdsState::new(&request.account_id, &request.region);
507            let state = accounts.get(&request.account_id).unwrap_or(&empty);
508            state
509                .instances
510                .get(&db_instance_identifier)
511                .map(|i| i.db_instance_arn.clone())
512                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513        };
514
515        if let Some(runtime) = self.runtime.as_ref() {
516            runtime.stop_container(&db_instance_identifier).await;
517        }
518
519        let instance = {
520            let mut accounts = self.state.write();
521            let state = accounts.get_or_create(&request.account_id);
522            let inst = state
523                .instances
524                .get_mut(&db_instance_identifier)
525                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526            inst.db_instance_status = "stopped".to_string();
527            inst.container_id = String::new();
528            inst.clone()
529        };
530
531        self.emit_event(
532            RdsSourceType::DbInstance,
533            &db_instance_identifier,
534            &arn,
535            "RDS-EVENT-0089",
536            &["notification"],
537            "DB instance stopped",
538        );
539
540        Ok(AwsResponse::xml(
541            StatusCode::OK,
542            query_response_xml(
543                "StopDBInstance",
544                RDS_NS,
545                &format!(
546                    "<DBInstance>{}</DBInstance>",
547                    db_instance_xml(&instance, Some("stopped"))
548                ),
549                &request.request_id,
550            ),
551        ))
552    }
553
554    /// Restart a stopped DB instance: spin up the backing container and
555    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
556    async fn start_db_instance(
557        &self,
558        request: &AwsRequest,
559    ) -> Result<AwsResponse, AwsServiceError> {
560        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562        // Look up the instance first so a missing identifier returns the
563        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
564        // missing container runtime.
565        let instance = {
566            let accounts = self.state.read();
567            let empty = RdsState::new(&request.account_id, &request.region);
568            let state = accounts.get(&request.account_id).unwrap_or(&empty);
569            state
570                .instances
571                .get(&db_instance_identifier)
572                .cloned()
573                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574        };
575
576        // Flip to `starting` so concurrent DescribeDBInstances callers
577        // see the in-flight state.
578        {
579            let mut accounts = self.state.write();
580            let state = accounts.get_or_create(&request.account_id);
581            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582                inst.db_instance_status = "starting".to_string();
583            }
584        }
585
586        // No container runtime configured: fail synchronously (fast) rather
587        // than report success against a backend that does not exist, rolling
588        // the row back from `starting` so subsequent calls don't see a stuck
589        // state.
590        let Some(runtime) = self.runtime.clone() else {
591            {
592                let mut accounts = self.state.write();
593                let state = accounts.get_or_create(&request.account_id);
594                if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
595                    inst.db_instance_status = "stopped".to_string();
596                }
597            }
598            return Err(AwsServiceError::aws_error(
599                StatusCode::SERVICE_UNAVAILABLE,
600                "InternalFailure",
601                "Container runtime is not configured; cannot start DB instance",
602            ));
603        };
604
605        // Background the container start + readiness wait and return
606        // immediately with `starting`. `ensure_postgres` can pull a cold image
607        // and wait for engine readiness (3-6 min for Oracle/SQL Server/Db2),
608        // far past the ~60s client read timeout — awaiting it inline timed the
609        // CLI out (bug-hunt 2026-06-24, 3.2). CreateDBInstance already
610        // backgrounds this exact call.
611        {
612            let state_handle = self.state.clone();
613            let delivery_bus = self.delivery_bus.clone();
614            let snapshot_store = self.snapshot_store.clone();
615            let snapshot_lock = self.snapshot_lock.clone();
616            let id = db_instance_identifier.clone();
617            let account_id = request.account_id.clone();
618            let region = request.region.clone();
619            let inst = instance.clone();
620            tokio::spawn(async move {
621                let logical_db = inst
622                    .db_name
623                    .clone()
624                    .unwrap_or_else(|| default_db_name(&inst.engine).to_string());
625                match runtime
626                    .ensure_postgres(
627                        &id,
628                        &inst.engine,
629                        &inst.engine_version,
630                        &inst.master_username,
631                        &inst.master_user_password,
632                        &logical_db,
633                        &account_id,
634                        &region,
635                        &inst.tags,
636                    )
637                    .await
638                {
639                    Ok(r) => {
640                        let arn = {
641                            let mut accounts = state_handle.write();
642                            let state = accounts.get_or_create(&account_id);
643                            let Some(inst) = state.instances.get_mut(&id) else {
644                                return;
645                            };
646                            inst.db_instance_status = "available".to_string();
647                            inst.endpoint_address = r.endpoint_address.clone();
648                            inst.port = i32::from(r.endpoint_port);
649                            inst.host_port = r.host_port;
650                            inst.container_id = r.container_id;
651                            inst.db_instance_arn.clone()
652                        };
653                        emit_event_static_with_state(
654                            delivery_bus.as_ref(),
655                            Some(&state_handle),
656                            Some(&account_id),
657                            RdsSourceType::DbInstance,
658                            &id,
659                            &arn,
660                            "RDS-EVENT-0088",
661                            &["notification"],
662                            "DB instance started",
663                        );
664                        save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock)
665                            .await;
666                    }
667                    Err(_) => {
668                        // Roll back to `stopped` so the next Describe doesn't
669                        // report a permanently-`starting` instance.
670                        let mut accounts = state_handle.write();
671                        let state = accounts.get_or_create(&account_id);
672                        if let Some(inst) = state.instances.get_mut(&id) {
673                            inst.db_instance_status = "stopped".to_string();
674                        }
675                    }
676                }
677            });
678        }
679
680        Ok(AwsResponse::xml(
681            StatusCode::OK,
682            query_response_xml(
683                "StartDBInstance",
684                RDS_NS,
685                &format!(
686                    "<DBInstance>{}</DBInstance>",
687                    db_instance_xml(&instance, Some("starting"))
688                ),
689                &request.request_id,
690            ),
691        ))
692    }
693}
694
695/// Persist the current `RdsState` to the configured snapshot store. Free
696/// function so background tasks (e.g. the create-DB-instance container-start
697/// task) can save without holding a `&RdsService`. Returns immediately when
698/// Whether the given AWS error code is in the AddTagsToResource Smithy
699/// model's declared error set. Used so undeclared `*NotFound` codes
700/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
701/// swallowed into a no-op rather than surfaced as a non-modelled error.
702fn is_declared_add_tags_not_found(code: &str) -> bool {
703    matches!(
704        code,
705        "BlueGreenDeploymentNotFoundFault"
706            | "DBClusterNotFoundFault"
707            | "DBInstanceNotFound"
708            | "DBProxyEndpointNotFoundFault"
709            | "DBProxyNotFoundFault"
710            | "DBProxyTargetGroupNotFoundFault"
711            | "DBShardGroupNotFound"
712            | "DBSnapshotNotFound"
713            | "DBSnapshotTenantDatabaseNotFoundFault"
714            | "IntegrationNotFoundFault"
715            | "InvalidDBClusterEndpointStateFault"
716            | "InvalidDBClusterStateFault"
717            | "InvalidDBInstanceState"
718            | "TenantDatabaseNotFound"
719    )
720}
721
722/// no store is configured (memory-mode runs).
723async fn save_snapshot_static(
724    state: SharedRdsState,
725    store: Option<Arc<dyn SnapshotStore>>,
726    lock: Arc<AsyncMutex<()>>,
727) {
728    let Some(store) = store else {
729        return;
730    };
731    let _guard = lock.lock().await;
732    let snapshot = RdsSnapshot {
733        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
734        state: None,
735        accounts: Some(state.read().clone()),
736    };
737    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
738        let bytes = serde_json::to_vec(&snapshot)
739            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
740        store.save(&bytes)
741    })
742    .await;
743    match join {
744        Ok(Ok(())) => {}
745        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
746        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
747    }
748}
749
750impl RdsService {
751    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
752    ///
753    /// RDS operations that start, stop, or reach into a database container fail
754    /// with a consistent wire error when the daemon (Docker/Podman) is missing
755    /// rather than each caller restating the message.
756    /// Resolve the container runtime or return a declared error shape.
757    /// `InsufficientDBInstanceCapacity` is declared on every op that
758    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
759    /// Read Replica), and is the closest Smithy-modelled analogue for
760    /// "fakecloud can't satisfy this DB request right now".
761    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
762        self.runtime.as_ref().ok_or_else(|| {
763            AwsServiceError::aws_error(
764                StatusCode::SERVICE_UNAVAILABLE,
765                "InsufficientDBInstanceCapacity",
766                "Docker/Podman is required for RDS DB instances but is not available",
767            )
768        })
769    }
770
771    /// Background the container start + optional data-replay for a
772    /// restore/replica op. The caller inserts a `creating` placeholder row and
773    /// returns immediately; this spawns `ensure_postgres` (which can pull a cold
774    /// image and wait minutes for engine readiness, far past the ~60s client
775    /// read timeout) and, when `dump` is present, replays it before flipping the
776    /// row to `available`. Mirrors `create_db_instance`'s backgrounding so
777    /// restore/replica no longer time the client out (bug-hunt 2026-07-01,
778    /// Tier-0). On failure the placeholder row is removed and any orphaned
779    /// container reaped.
780    #[allow(clippy::too_many_arguments)]
781    fn spawn_finalize_restored_instance(
782        &self,
783        runtime: Arc<RdsRuntime>,
784        account_id: String,
785        region: String,
786        id: String,
787        arn: String,
788        engine: String,
789        engine_version: String,
790        master_username: String,
791        master_user_password: String,
792        logical_db: String,
793        tags: Vec<RdsTag>,
794        dump: Option<Vec<u8>>,
795        created_event: (&'static str, &'static str),
796    ) {
797        let state_handle = self.state.clone();
798        let delivery_bus = self.delivery_bus.clone();
799        let snapshot_store = self.snapshot_store.clone();
800        let snapshot_lock = self.snapshot_lock.clone();
801        let (event_id, event_message) = created_event;
802        // Shared failure path: drop the placeholder row, reap any container the
803        // runtime managed to start, persist, and emit the create-failure event.
804        async fn fail(
805            state_handle: &SharedRdsState,
806            snapshot_store: Option<Arc<dyn SnapshotStore>>,
807            snapshot_lock: Arc<AsyncMutex<()>>,
808            delivery_bus: Option<&Arc<DeliveryBus>>,
809            runtime: &Arc<RdsRuntime>,
810            account_id: &str,
811            id: &str,
812            arn: &str,
813            error: &str,
814        ) {
815            tracing::error!(%error, db_instance_identifier=%id, "restore/replica background finalize failed");
816            {
817                let mut accounts = state_handle.write();
818                let state = accounts.get_or_create(account_id);
819                state.instances.remove(id);
820                // A read replica registered itself against its source
821                // synchronously; drop that reverse linkage so the source
822                // doesn't keep a dangling replica id after this failure.
823                for inst in state.instances.values_mut() {
824                    inst.read_replica_db_instance_identifiers
825                        .retain(|r| r != id);
826                }
827            }
828            runtime.stop_container(id).await;
829            save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
830            emit_event_static(
831                delivery_bus,
832                RdsSourceType::DbInstance,
833                id,
834                arn,
835                "RDS-EVENT-0058",
836                &["failure"],
837                &format!("DB instance failed to create: {error}"),
838            );
839        }
840        tokio::spawn(async move {
841            let running = match runtime
842                .ensure_postgres(
843                    &id,
844                    &engine,
845                    &engine_version,
846                    &master_username,
847                    &master_user_password,
848                    &logical_db,
849                    &account_id,
850                    &region,
851                    &tags,
852                )
853                .await
854            {
855                Ok(running) => running,
856                Err(error) => {
857                    fail(
858                        &state_handle,
859                        snapshot_store,
860                        snapshot_lock,
861                        delivery_bus.as_ref(),
862                        &runtime,
863                        &account_id,
864                        &id,
865                        &arn,
866                        &error.to_string(),
867                    )
868                    .await;
869                    return;
870                }
871            };
872
873            if let Some(dump) = dump {
874                if let Err(error) = runtime
875                    .restore_database(
876                        &id,
877                        &engine,
878                        &master_username,
879                        &master_user_password,
880                        &logical_db,
881                        &dump,
882                    )
883                    .await
884                {
885                    // A failed data replay must NOT be reported as a successful
886                    // `available` restore with missing data — fail the instance.
887                    fail(
888                        &state_handle,
889                        snapshot_store,
890                        snapshot_lock,
891                        delivery_bus.as_ref(),
892                        &runtime,
893                        &account_id,
894                        &id,
895                        &arn,
896                        &error.to_string(),
897                    )
898                    .await;
899                    return;
900                }
901            }
902
903            let instance_present = {
904                let mut accounts = state_handle.write();
905                let state = accounts.get_or_create(&account_id);
906                if let Some(inst) = state.instances.get_mut(&id) {
907                    inst.db_instance_status = "available".to_string();
908                    inst.endpoint_address = running.endpoint_address.clone();
909                    inst.port = i32::from(running.endpoint_port);
910                    inst.host_port = running.host_port;
911                    inst.container_id = running.container_id.clone();
912                    true
913                } else {
914                    false
915                }
916            };
917            if !instance_present {
918                // Deleted while creating: reap the orphaned backing container.
919                runtime.stop_container(&id).await;
920                save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
921                return;
922            }
923            emit_event_static_with_state(
924                delivery_bus.as_ref(),
925                Some(&state_handle),
926                Some(&account_id),
927                RdsSourceType::DbInstance,
928                &id,
929                &arn,
930                event_id,
931                &["creation"],
932                event_message,
933            );
934            save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
935        });
936    }
937
938    /// Build a hook that persists the current state when invoked, or `None` in
939    /// memory mode. The CloudFormation provisioner mutates `state` directly and
940    /// uses this to write a CFN-provisioned resource through to disk.
941    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
942        let store = self.snapshot_store.clone()?;
943        let state = self.state.clone();
944        let lock = self.snapshot_lock.clone();
945        Some(Arc::new(move || {
946            let state = state.clone();
947            let store = store.clone();
948            let lock = lock.clone();
949            Box::pin(async move {
950                save_snapshot_static(state, Some(store), lock).await;
951            })
952        }))
953    }
954}
955
956#[async_trait]
957impl AwsService for RdsService {
958    fn service_name(&self) -> &str {
959        "rds"
960    }
961
962    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
963        // Centralized Smithy-aligned validation. Returns the appropriate
964        // `MissingParameter` / `InvalidParameterValue` error before the
965        // per-action handler runs. Actions without a constraint entry
966        // fall through unchanged.
967        crate::validation::prevalidate(request.action.as_str(), &request)?;
968
969        let mutates = is_mutating_action(request.action.as_str());
970        let result = match request.action.as_str() {
971            "AddTagsToResource" => self.add_tags_to_resource(&request),
972            "CreateDBInstance" => self.create_db_instance(&request).await,
973            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
974            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
975            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
976            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
977            "DeleteDBInstance" => self.delete_db_instance(&request).await,
978            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
979            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
980            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
981            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
982            "DescribeDBInstances" => self.describe_db_instances(&request),
983            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
984            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
985            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
986            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
987            "DescribeOrderableDBInstanceOptions" => {
988                self.describe_orderable_db_instance_options(&request)
989            }
990            "ListTagsForResource" => self.list_tags_for_resource(&request),
991            "ModifyDBInstance" => self.modify_db_instance(&request),
992            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
993            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
994            "RebootDBInstance" => self.reboot_db_instance(&request).await,
995            "StartDBInstance" => self.start_db_instance(&request).await,
996            "StopDBInstance" => self.stop_db_instance(&request).await,
997            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
998            "RestoreDBInstanceFromDBSnapshot" => {
999                self.restore_db_instance_from_db_snapshot(&request).await
1000            }
1001            "RestoreDBInstanceToPointInTime" => {
1002                self.restore_db_instance_to_point_in_time(&request).await
1003            }
1004            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
1005            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
1006            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
1007            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
1008            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
1009            "RestoreDBClusterToPointInTime" => {
1010                self.restore_db_cluster_to_point_in_time(&request).await
1011            }
1012            _ => self.handle_extra_action(&request),
1013        };
1014        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1015            self.save_snapshot().await;
1016        }
1017        result
1018    }
1019
1020    fn supported_actions(&self) -> &[&str] {
1021        SUPPORTED_ACTIONS
1022    }
1023}
1024
1025impl RdsService {}
1026
1027/// Render a single user-set parameter as the XML shape AWS emits inside
1028/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
1029/// alongside user values so we report `dynamic`/`string` defaults.
1030pub(crate) fn render_user_parameter_xml(name: &str, value: &str, apply_method: &str) -> String {
1031    format!(
1032        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <ApplyMethod>{}</ApplyMethod>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
1033        xml_escape(name),
1034        xml_escape(value),
1035        xml_escape(apply_method),
1036    )
1037}
1038
1039/// Render a single engine-default parameter as the XML shape AWS emits
1040/// inside `DescribeDB(Cluster)Parameters` and
1041/// `DescribeEngineDefault(Cluster)Parameters` responses.
1042pub(crate) fn render_engine_default_parameter_xml(
1043    default: &crate::state::EngineDefaultParameter,
1044) -> String {
1045    format!(
1046        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
1047        xml_escape(default.name),
1048        xml_escape(default.value),
1049        xml_escape(default.apply_type),
1050        xml_escape(default.data_type),
1051        xml_escape(default.allowed_values),
1052        default.is_modifiable,
1053    )
1054}
1055
1056/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
1057/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
1058/// (the `Parameter` list location name from the Smithy model); we also
1059/// accept the generic `Parameters.member.N` form so hand-built clients
1060/// using the default Query list shape keep working. Skips members
1061/// missing a name or value. `ApplyMethod` defaults to `immediate` (AWS's
1062/// default) and is preserved so a `Describe` round-trip echoes it back —
1063/// the Terraform provider sets `apply_method = "immediate"` by default and
1064/// drifts if the read-back omits it.
1065pub(crate) struct DbParameterInput {
1066    pub name: String,
1067    pub value: String,
1068    pub apply_method: String,
1069}
1070
1071pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<DbParameterInput> {
1072    let mut out = Vec::new();
1073    for prefix in ["Parameters.Parameter", "Parameters.member"] {
1074        let mut index = 1;
1075        loop {
1076            let name_key = format!("{prefix}.{index}.ParameterName");
1077            let value_key = format!("{prefix}.{index}.ParameterValue");
1078            let apply_key = format!("{prefix}.{index}.ApplyMethod");
1079            let name = optional_query_param(request, &name_key);
1080            let value = optional_query_param(request, &value_key);
1081            if name.is_none() && value.is_none() {
1082                break;
1083            }
1084            if let (Some(n), Some(v)) = (name, value) {
1085                if !n.is_empty() {
1086                    let apply_method = optional_query_param(request, &apply_key)
1087                        .filter(|m| !m.is_empty())
1088                        .unwrap_or_else(|| "immediate".to_string());
1089                    out.push(DbParameterInput {
1090                        name: n,
1091                        value: v,
1092                        apply_method,
1093                    });
1094                }
1095            }
1096            index += 1;
1097        }
1098    }
1099    out
1100}
1101
1102/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
1103/// the absolute path inside the running container. Unknown names fall
1104/// through as-is so callers can also fetch arbitrary paths.
1105fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
1106    match (engine, log_file_name) {
1107        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
1108        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
1109        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
1110        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
1111            "/var/log/mysql/slow.log".to_string()
1112        }
1113        _ => log_file_name.to_string(),
1114    }
1115}
1116
1117pub(crate) struct PaginationResult<T> {
1118    items: Vec<T>,
1119    next_marker: Option<String>,
1120}
1121
1122/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
1123/// promoting it to writer when the cluster has none. Idempotent:
1124/// re-attaching an existing member is a no-op.
1125fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
1126    use serde_json::{json, Value};
1127    let Some(map) = state.extras.get_mut("clusters") else {
1128        return;
1129    };
1130    let Some(entry) = map.get_mut(cluster_id) else {
1131        return;
1132    };
1133    let Some(obj) = entry.as_object_mut() else {
1134        return;
1135    };
1136    let mut members: Vec<Value> = obj
1137        .get("DBClusterMembers")
1138        .and_then(|v| v.as_array())
1139        .cloned()
1140        .unwrap_or_default();
1141    if members
1142        .iter()
1143        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
1144    {
1145        return;
1146    }
1147    let has_writer = members
1148        .iter()
1149        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
1150    let promotion_tier = (members.len() as i64) + 1;
1151    members.push(json!({
1152        "DBInstanceIdentifier": instance_id,
1153        "IsClusterWriter": !has_writer,
1154        "DBClusterParameterGroupStatus": "in-sync",
1155        "PromotionTier": promotion_tier,
1156    }));
1157    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
1158    if !has_writer {
1159        obj.insert(
1160            "WriterDBInstanceIdentifier".to_string(),
1161            Value::String(instance_id.to_string()),
1162        );
1163    }
1164}
1165
1166#[path = "../service_helpers.rs"]
1167pub(crate) mod service_helpers;
1168pub(crate) use service_helpers::*;
1169
1170#[cfg(test)]
1171#[path = "../service_tests.rs"]
1172mod tests;