Skip to main content

fakecloud_rds/service/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26    "AddRoleToDBCluster",
27    "AddRoleToDBInstance",
28    "AddSourceIdentifierToSubscription",
29    "AddTagsToResource",
30    "ApplyPendingMaintenanceAction",
31    "AuthorizeDBSecurityGroupIngress",
32    "BacktrackDBCluster",
33    "CancelExportTask",
34    "CopyDBClusterParameterGroup",
35    "CopyDBClusterSnapshot",
36    "CopyDBParameterGroup",
37    "CopyDBSnapshot",
38    "CopyOptionGroup",
39    "CreateBlueGreenDeployment",
40    "CreateCustomDBEngineVersion",
41    "CreateDBCluster",
42    "CreateDBClusterEndpoint",
43    "CreateDBClusterParameterGroup",
44    "CreateDBClusterSnapshot",
45    "CreateDBInstance",
46    "CreateDBInstanceReadReplica",
47    "CreateDBParameterGroup",
48    "CreateDBProxy",
49    "CreateDBProxyEndpoint",
50    "CreateDBSecurityGroup",
51    "CreateDBShardGroup",
52    "CreateDBSnapshot",
53    "CreateDBSubnetGroup",
54    "CreateEventSubscription",
55    "CreateGlobalCluster",
56    "CreateIntegration",
57    "CreateOptionGroup",
58    "CreateTenantDatabase",
59    "DeleteBlueGreenDeployment",
60    "DeleteCustomDBEngineVersion",
61    "DeleteDBCluster",
62    "DeleteDBClusterAutomatedBackup",
63    "DeleteDBClusterEndpoint",
64    "DeleteDBClusterParameterGroup",
65    "DeleteDBClusterSnapshot",
66    "DeleteDBInstance",
67    "DeleteDBInstanceAutomatedBackup",
68    "DeleteDBParameterGroup",
69    "DeleteDBProxy",
70    "DeleteDBProxyEndpoint",
71    "DeleteDBSecurityGroup",
72    "DeleteDBShardGroup",
73    "DeleteDBSnapshot",
74    "DeleteDBSubnetGroup",
75    "DeleteEventSubscription",
76    "DeleteGlobalCluster",
77    "DeleteIntegration",
78    "DeleteOptionGroup",
79    "DeleteTenantDatabase",
80    "DeregisterDBProxyTargets",
81    "DescribeAccountAttributes",
82    "DescribeBlueGreenDeployments",
83    "DescribeCertificates",
84    "DescribeDBClusterAutomatedBackups",
85    "DescribeDBClusterBacktracks",
86    "DescribeDBClusterEndpoints",
87    "DescribeDBClusterParameterGroups",
88    "DescribeDBClusterParameters",
89    "DescribeDBClusterSnapshotAttributes",
90    "DescribeDBClusterSnapshots",
91    "DescribeDBClusters",
92    "DescribeDBEngineVersions",
93    "DescribeDBInstanceAutomatedBackups",
94    "DescribeDBInstances",
95    "DescribeDBLogFiles",
96    "DescribeDBMajorEngineVersions",
97    "DescribeDBParameterGroups",
98    "DescribeDBParameters",
99    "DescribeDBProxies",
100    "DescribeDBProxyEndpoints",
101    "DescribeDBProxyTargetGroups",
102    "DescribeDBProxyTargets",
103    "DescribeDBRecommendations",
104    "DescribeDBSecurityGroups",
105    "DescribeDBShardGroups",
106    "DescribeDBSnapshotAttributes",
107    "DescribeDBSnapshotTenantDatabases",
108    "DescribeDBSnapshots",
109    "DescribeDBSubnetGroups",
110    "DescribeEngineDefaultClusterParameters",
111    "DescribeEngineDefaultParameters",
112    "DescribeEventCategories",
113    "DescribeEventSubscriptions",
114    "DescribeEvents",
115    "DescribeExportTasks",
116    "DescribeGlobalClusters",
117    "DescribeIntegrations",
118    "DescribeOptionGroupOptions",
119    "DescribeOptionGroups",
120    "DescribeOrderableDBInstanceOptions",
121    "DescribePendingMaintenanceActions",
122    "DescribeReservedDBInstances",
123    "DescribeReservedDBInstancesOfferings",
124    "DescribeServerlessV2PlatformVersions",
125    "DescribeSourceRegions",
126    "DescribeTenantDatabases",
127    "DescribeValidDBInstanceModifications",
128    "DisableHttpEndpoint",
129    "DownloadDBLogFilePortion",
130    "EnableHttpEndpoint",
131    "FailoverDBCluster",
132    "FailoverGlobalCluster",
133    "ListTagsForResource",
134    "ModifyActivityStream",
135    "ModifyCertificates",
136    "ModifyCurrentDBClusterCapacity",
137    "ModifyCustomDBEngineVersion",
138    "ModifyDBCluster",
139    "ModifyDBClusterEndpoint",
140    "ModifyDBClusterParameterGroup",
141    "ModifyDBClusterSnapshotAttribute",
142    "ModifyDBInstance",
143    "ModifyDBParameterGroup",
144    "ModifyDBProxy",
145    "ModifyDBProxyEndpoint",
146    "ModifyDBProxyTargetGroup",
147    "ModifyDBRecommendation",
148    "ModifyDBShardGroup",
149    "ModifyDBSnapshot",
150    "ModifyDBSnapshotAttribute",
151    "ModifyDBSubnetGroup",
152    "ModifyEventSubscription",
153    "ModifyGlobalCluster",
154    "ModifyIntegration",
155    "ModifyOptionGroup",
156    "ModifyTenantDatabase",
157    "PromoteReadReplica",
158    "PromoteReadReplicaDBCluster",
159    "PurchaseReservedDBInstancesOffering",
160    "RebootDBCluster",
161    "RebootDBInstance",
162    "RebootDBShardGroup",
163    "RegisterDBProxyTargets",
164    "RemoveFromGlobalCluster",
165    "RemoveRoleFromDBCluster",
166    "RemoveRoleFromDBInstance",
167    "RemoveSourceIdentifierFromSubscription",
168    "RemoveTagsFromResource",
169    "ResetDBClusterParameterGroup",
170    "ResetDBParameterGroup",
171    "RestoreDBClusterFromS3",
172    "RestoreDBClusterFromSnapshot",
173    "RestoreDBClusterToPointInTime",
174    "RestoreDBInstanceFromDBSnapshot",
175    "RestoreDBInstanceFromS3",
176    "RestoreDBInstanceToPointInTime",
177    "RevokeDBSecurityGroupIngress",
178    "StartActivityStream",
179    "StartDBCluster",
180    "StartDBInstance",
181    "StartDBInstanceAutomatedBackupsReplication",
182    "StartExportTask",
183    "StopActivityStream",
184    "StopDBCluster",
185    "StopDBInstance",
186    "StopDBInstanceAutomatedBackupsReplication",
187    "SwitchoverBlueGreenDeployment",
188    "SwitchoverGlobalCluster",
189    "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193    pub(crate) state: SharedRdsState,
194    runtime: Option<Arc<RdsRuntime>>,
195    snapshot_store: Option<Arc<dyn SnapshotStore>>,
196    snapshot_lock: Arc<AsyncMutex<()>>,
197    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
201#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204    DbInstance,
205    DbSnapshot,
206    DbParameterGroup,
207    DbCluster,
208    DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212    /// EventBridge `SourceType` enum string. Matches the SCREAMING_SNAKE
213    /// form AWS publishes in the `aws.rds` event detail.
214    fn as_str(self) -> &'static str {
215        match self {
216            Self::DbInstance => "DB_INSTANCE",
217            Self::DbSnapshot => "DB_SNAPSHOT",
218            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219            Self::DbCluster => "DB_CLUSTER",
220            Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221        }
222    }
223
224    /// `DescribeEvents` `SourceType` filter / response value. Per AWS
225    /// API spec this is the kebab-case form (`db-instance`,
226    /// `db-cluster`, `db-snapshot`, `db-parameter-group`, ...) — distinct
227    /// from the EventBridge `SourceType` returned by [`Self::as_str`].
228    pub(crate) fn describe_events_str(self) -> &'static str {
229        match self {
230            Self::DbInstance => "db-instance",
231            Self::DbSnapshot => "db-snapshot",
232            Self::DbParameterGroup => "db-parameter-group",
233            Self::DbCluster => "db-cluster",
234            Self::DbClusterSnapshot => "db-cluster-snapshot",
235        }
236    }
237
238    fn detail_type(self) -> &'static str {
239        match self {
240            Self::DbInstance => "RDS DB Instance Event",
241            Self::DbSnapshot => "RDS DB Snapshot Event",
242            Self::DbParameterGroup => "RDS DB Parameter Group Event",
243            Self::DbCluster => "RDS DB Cluster Event",
244            Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245        }
246    }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261    pub(crate) fn state_handle(&self) -> &SharedRdsState {
262        &self.state
263    }
264}
265
266impl RdsService {
267    pub fn new(state: SharedRdsState) -> Self {
268        Self {
269            state,
270            runtime: None,
271            snapshot_store: None,
272            snapshot_lock: Arc::new(AsyncMutex::new(())),
273            delivery_bus: None,
274        }
275    }
276
277    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278        self.runtime = Some(runtime);
279        self
280    }
281
282    /// Crate-internal accessor for the optional runtime; needed by the
283    /// extras handler so cluster snapshot/restore paths can dump and
284    /// replay member databases via the live container runtime.
285    pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286        self.runtime.as_ref()
287    }
288
289    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290        self.snapshot_store = Some(store);
291        self
292    }
293
294    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295        self.delivery_bus = Some(bus);
296        self
297    }
298
299    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
300    /// Also records into the per-account events ring so DescribeEvents
301    /// can serve the row. No-op for the EventBridge side when the bus
302    /// isn't wired (tests, minimal configs).
303    pub(crate) fn emit_event(
304        &self,
305        source_type: RdsSourceType,
306        source_identifier: &str,
307        source_arn: &str,
308        event_id: &str,
309        event_categories: &[&str],
310        message: &str,
311    ) {
312        // Source the account_id off the source_arn (segment 4) — that's
313        // the canonical ARN form for RDS resources.
314        let account_id = source_arn.split(':').nth(4).unwrap_or("");
315        emit_event_static_with_state(
316            self.delivery_bus.as_ref(),
317            Some(&self.state),
318            if account_id.is_empty() {
319                None
320            } else {
321                Some(account_id)
322            },
323            source_type,
324            source_identifier,
325            source_arn,
326            event_id,
327            event_categories,
328            message,
329        );
330    }
331
332    async fn save_snapshot(&self) {
333        save_snapshot_static(
334            self.state.clone(),
335            self.snapshot_store.clone(),
336            self.snapshot_lock.clone(),
337        )
338        .await;
339    }
340
341    /// Recreate the backing Docker/Podman containers for persisted DB
342    /// instances after a fakecloud restart. Without this, persistent
343    /// mode loads the row back into memory with `db_instance_status =
344    /// available` but the container is gone, so the endpoint is dead
345    /// (issue #1338). Each candidate is flipped to `starting`
346    /// synchronously, then a background task brings the container back
347    /// and flips it back to `available`. Instances persisted as
348    /// `stopped` are skipped — `StartDBInstance` revives those.
349    pub async fn recover_persisted_containers(&self) {
350        let Some(runtime) = self.runtime.clone() else {
351            return;
352        };
353
354        struct Pending {
355            account_id: String,
356            region: String,
357            id: String,
358            arn: String,
359            engine: String,
360            engine_version: String,
361            username: String,
362            password: String,
363            db_name: String,
364            tags: Vec<crate::state::RdsTag>,
365        }
366
367        let pending: Vec<Pending> = {
368            let mut accounts = self.state.write();
369            let mut out = Vec::new();
370            for (_, state) in accounts.iter_mut() {
371                let account_id = state.account_id.clone();
372                let region = state.region.clone();
373                for (id, inst) in state.instances.iter_mut() {
374                    // "creating" is included so an instance whose background
375                    // create task hadn't finished (and re-saved it as
376                    // "available") when the process crashed is resumed rather
377                    // than silently dropped on restart — the API already
378                    // returned it to the client, so DescribeDBInstances must
379                    // not lose it (bug-hunt 2026-06-13, finding 4.3). Recovery
380                    // re-drives it through `ensure_*` to a live container.
381                    if !matches!(
382                        inst.db_instance_status.as_str(),
383                        "creating"
384                            | "available"
385                            | "starting"
386                            | "modifying"
387                            | "rebooting"
388                            | "backing-up"
389                    ) {
390                        continue;
391                    }
392                    inst.db_instance_status = "starting".to_string();
393                    out.push(Pending {
394                        account_id: account_id.clone(),
395                        region: region.clone(),
396                        id: id.clone(),
397                        arn: inst.db_instance_arn.clone(),
398                        engine: inst.engine.clone(),
399                        engine_version: inst.engine_version.clone(),
400                        username: inst.master_username.clone(),
401                        password: inst.master_user_password.clone(),
402                        db_name: inst
403                            .db_name
404                            .clone()
405                            .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406                        tags: inst.tags.clone(),
407                    });
408                }
409            }
410            out
411        };
412
413        if pending.is_empty() {
414            return;
415        }
416        tracing::info!(
417            count = pending.len(),
418            "recovering backing containers for persisted rds instances",
419        );
420
421        for p in pending {
422            let runtime = runtime.clone();
423            let state = self.state.clone();
424            let snapshot_store = self.snapshot_store.clone();
425            let snapshot_lock = self.snapshot_lock.clone();
426            let delivery_bus = self.delivery_bus.clone();
427            tokio::spawn(async move {
428                match runtime
429                    .ensure_postgres(
430                        &p.id,
431                        &p.engine,
432                        &p.engine_version,
433                        &p.username,
434                        &p.password,
435                        &p.db_name,
436                        &p.account_id,
437                        &p.region,
438                        &p.tags,
439                    )
440                    .await
441                {
442                    Ok(running) => {
443                        {
444                            let mut accounts = state.write();
445                            if let Some(s) = accounts.get_mut(&p.account_id) {
446                                if let Some(inst) = s.instances.get_mut(&p.id) {
447                                    inst.db_instance_status = "available".to_string();
448                                    inst.endpoint_address = running.endpoint_address.clone();
449                                    inst.port = i32::from(running.endpoint_port);
450                                    inst.host_port = running.host_port;
451                                    inst.container_id = running.container_id;
452                                }
453                            }
454                        }
455                        save_snapshot_static(
456                            state.clone(),
457                            snapshot_store.clone(),
458                            snapshot_lock.clone(),
459                        )
460                        .await;
461                        emit_event_static(
462                            delivery_bus.as_ref(),
463                            RdsSourceType::DbInstance,
464                            &p.id,
465                            &p.arn,
466                            "RDS-EVENT-0088",
467                            &["notification"],
468                            "DB instance restarted after fakecloud restart",
469                        );
470                    }
471                    Err(error) => {
472                        tracing::error!(
473                            %error,
474                            db_instance_identifier = %p.id,
475                            "failed to recover rds backing container after restart",
476                        );
477                        {
478                            let mut accounts = state.write();
479                            if let Some(s) = accounts.get_mut(&p.account_id) {
480                                if let Some(inst) = s.instances.get_mut(&p.id) {
481                                    inst.db_instance_status = "failed".to_string();
482                                }
483                            }
484                        }
485                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486                    }
487                }
488            });
489        }
490    }
491
492    /// Stop the backing container for `db_instance_identifier` and mark
493    /// the row `stopped`. Synchronous wrt the runtime so callers see a
494    /// real `stopped` status by the time the response goes back; mirrors
495    /// the AWS contract that `StartDBInstance`/`StopDBInstance` change
496    /// the visible status immediately.
497    async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500        // Look up the instance first so a missing identifier returns the
501        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
502        // missing container runtime. Conformance probes hit Start/Stop with
503        // synthetic identifiers and expect the documented error shape.
504        let arn = {
505            let accounts = self.state.read();
506            let empty = RdsState::new(&request.account_id, &request.region);
507            let state = accounts.get(&request.account_id).unwrap_or(&empty);
508            state
509                .instances
510                .get(&db_instance_identifier)
511                .map(|i| i.db_instance_arn.clone())
512                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513        };
514
515        if let Some(runtime) = self.runtime.as_ref() {
516            runtime.stop_container(&db_instance_identifier).await;
517        }
518
519        let instance = {
520            let mut accounts = self.state.write();
521            let state = accounts.get_or_create(&request.account_id);
522            let inst = state
523                .instances
524                .get_mut(&db_instance_identifier)
525                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526            inst.db_instance_status = "stopped".to_string();
527            inst.container_id = String::new();
528            inst.clone()
529        };
530
531        self.emit_event(
532            RdsSourceType::DbInstance,
533            &db_instance_identifier,
534            &arn,
535            "RDS-EVENT-0089",
536            &["notification"],
537            "DB instance stopped",
538        );
539
540        Ok(AwsResponse::xml(
541            StatusCode::OK,
542            query_response_xml(
543                "StopDBInstance",
544                RDS_NS,
545                &format!(
546                    "<DBInstance>{}</DBInstance>",
547                    db_instance_xml(&instance, Some("stopped"))
548                ),
549                &request.request_id,
550            ),
551        ))
552    }
553
554    /// Restart a stopped DB instance: spin up the backing container and
555    /// flip the row back to `available`. Mirrors AWS `StartDBInstance`.
556    async fn start_db_instance(
557        &self,
558        request: &AwsRequest,
559    ) -> Result<AwsResponse, AwsServiceError> {
560        let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562        // Look up the instance first so a missing identifier returns the
563        // declared `DBInstanceNotFoundFault` error rather than a 503 from a
564        // missing container runtime.
565        let instance = {
566            let accounts = self.state.read();
567            let empty = RdsState::new(&request.account_id, &request.region);
568            let state = accounts.get(&request.account_id).unwrap_or(&empty);
569            state
570                .instances
571                .get(&db_instance_identifier)
572                .cloned()
573                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574        };
575
576        // Flip to `starting` so concurrent DescribeDBInstances callers
577        // see the in-flight state.
578        {
579            let mut accounts = self.state.write();
580            let state = accounts.get_or_create(&request.account_id);
581            if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582                inst.db_instance_status = "starting".to_string();
583            }
584        }
585
586        // No container runtime configured: fail synchronously (fast) rather
587        // than report success against a backend that does not exist, rolling
588        // the row back from `starting` so subsequent calls don't see a stuck
589        // state.
590        let Some(runtime) = self.runtime.clone() else {
591            {
592                let mut accounts = self.state.write();
593                let state = accounts.get_or_create(&request.account_id);
594                if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
595                    inst.db_instance_status = "stopped".to_string();
596                }
597            }
598            return Err(AwsServiceError::aws_error(
599                StatusCode::SERVICE_UNAVAILABLE,
600                "InternalFailure",
601                "Container runtime is not configured; cannot start DB instance",
602            ));
603        };
604
605        // Background the container start + readiness wait and return
606        // immediately with `starting`. `ensure_postgres` can pull a cold image
607        // and wait for engine readiness (3-6 min for Oracle/SQL Server/Db2),
608        // far past the ~60s client read timeout — awaiting it inline timed the
609        // CLI out (bug-hunt 2026-06-24, 3.2). CreateDBInstance already
610        // backgrounds this exact call.
611        {
612            let state_handle = self.state.clone();
613            let delivery_bus = self.delivery_bus.clone();
614            let snapshot_store = self.snapshot_store.clone();
615            let snapshot_lock = self.snapshot_lock.clone();
616            let id = db_instance_identifier.clone();
617            let account_id = request.account_id.clone();
618            let region = request.region.clone();
619            let inst = instance.clone();
620            tokio::spawn(async move {
621                let logical_db = inst
622                    .db_name
623                    .clone()
624                    .unwrap_or_else(|| default_db_name(&inst.engine).to_string());
625                match runtime
626                    .ensure_postgres(
627                        &id,
628                        &inst.engine,
629                        &inst.engine_version,
630                        &inst.master_username,
631                        &inst.master_user_password,
632                        &logical_db,
633                        &account_id,
634                        &region,
635                        &inst.tags,
636                    )
637                    .await
638                {
639                    Ok(r) => {
640                        let arn = {
641                            let mut accounts = state_handle.write();
642                            let state = accounts.get_or_create(&account_id);
643                            let Some(inst) = state.instances.get_mut(&id) else {
644                                return;
645                            };
646                            inst.db_instance_status = "available".to_string();
647                            inst.endpoint_address = r.endpoint_address.clone();
648                            inst.port = i32::from(r.endpoint_port);
649                            inst.host_port = r.host_port;
650                            inst.container_id = r.container_id;
651                            inst.db_instance_arn.clone()
652                        };
653                        emit_event_static_with_state(
654                            delivery_bus.as_ref(),
655                            Some(&state_handle),
656                            Some(&account_id),
657                            RdsSourceType::DbInstance,
658                            &id,
659                            &arn,
660                            "RDS-EVENT-0088",
661                            &["notification"],
662                            "DB instance started",
663                        );
664                        save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock)
665                            .await;
666                    }
667                    Err(_) => {
668                        // Roll back to `stopped` so the next Describe doesn't
669                        // report a permanently-`starting` instance.
670                        let mut accounts = state_handle.write();
671                        let state = accounts.get_or_create(&account_id);
672                        if let Some(inst) = state.instances.get_mut(&id) {
673                            inst.db_instance_status = "stopped".to_string();
674                        }
675                    }
676                }
677            });
678        }
679
680        Ok(AwsResponse::xml(
681            StatusCode::OK,
682            query_response_xml(
683                "StartDBInstance",
684                RDS_NS,
685                &format!(
686                    "<DBInstance>{}</DBInstance>",
687                    db_instance_xml(&instance, Some("starting"))
688                ),
689                &request.request_id,
690            ),
691        ))
692    }
693}
694
695/// Persist the current `RdsState` to the configured snapshot store. Free
696/// function so background tasks (e.g. the create-DB-instance container-start
697/// task) can save without holding a `&RdsService`. Returns immediately when
698/// Whether the given AWS error code is in the AddTagsToResource Smithy
699/// model's declared error set. Used so undeclared `*NotFound` codes
700/// (OptionGroup, ParameterGroup, EventSubscription, SecurityGroup) get
701/// swallowed into a no-op rather than surfaced as a non-modelled error.
702fn is_declared_add_tags_not_found(code: &str) -> bool {
703    matches!(
704        code,
705        "BlueGreenDeploymentNotFoundFault"
706            | "DBClusterNotFoundFault"
707            | "DBInstanceNotFound"
708            | "DBProxyEndpointNotFoundFault"
709            | "DBProxyNotFoundFault"
710            | "DBProxyTargetGroupNotFoundFault"
711            | "DBShardGroupNotFound"
712            | "DBSnapshotNotFound"
713            | "DBSnapshotTenantDatabaseNotFoundFault"
714            | "IntegrationNotFoundFault"
715            | "InvalidDBClusterEndpointStateFault"
716            | "InvalidDBClusterStateFault"
717            | "InvalidDBInstanceState"
718            | "TenantDatabaseNotFound"
719    )
720}
721
722/// no store is configured (memory-mode runs).
723async fn save_snapshot_static(
724    state: SharedRdsState,
725    store: Option<Arc<dyn SnapshotStore>>,
726    lock: Arc<AsyncMutex<()>>,
727) {
728    let Some(store) = store else {
729        return;
730    };
731    let _guard = lock.lock().await;
732    let snapshot = RdsSnapshot {
733        schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
734        state: None,
735        accounts: Some(state.read().clone()),
736    };
737    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
738        let bytes = serde_json::to_vec(&snapshot)
739            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
740        store.save(&bytes)
741    })
742    .await;
743    match join {
744        Ok(Ok(())) => {}
745        Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
746        Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
747    }
748}
749
750impl RdsService {
751    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
752    ///
753    /// RDS operations that start, stop, or reach into a database container fail
754    /// with a consistent wire error when the daemon (Docker/Podman) is missing
755    /// rather than each caller restating the message.
756    /// Resolve the container runtime or return a declared error shape.
757    /// `InsufficientDBInstanceCapacity` is declared on every op that
758    /// calls `require_runtime` (Create/Modify/Restore* DB instance and
759    /// Read Replica), and is the closest Smithy-modelled analogue for
760    /// "fakecloud can't satisfy this DB request right now".
761    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
762        self.runtime.as_ref().ok_or_else(|| {
763            AwsServiceError::aws_error(
764                StatusCode::SERVICE_UNAVAILABLE,
765                "InsufficientDBInstanceCapacity",
766                "Docker/Podman is required for RDS DB instances but is not available",
767            )
768        })
769    }
770
771    /// Build a hook that persists the current state when invoked, or `None` in
772    /// memory mode. The CloudFormation provisioner mutates `state` directly and
773    /// uses this to write a CFN-provisioned resource through to disk.
774    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
775        let store = self.snapshot_store.clone()?;
776        let state = self.state.clone();
777        let lock = self.snapshot_lock.clone();
778        Some(Arc::new(move || {
779            let state = state.clone();
780            let store = store.clone();
781            let lock = lock.clone();
782            Box::pin(async move {
783                save_snapshot_static(state, Some(store), lock).await;
784            })
785        }))
786    }
787}
788
789#[async_trait]
790impl AwsService for RdsService {
791    fn service_name(&self) -> &str {
792        "rds"
793    }
794
795    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
796        // Centralized Smithy-aligned validation. Returns the appropriate
797        // `MissingParameter` / `InvalidParameterValue` error before the
798        // per-action handler runs. Actions without a constraint entry
799        // fall through unchanged.
800        crate::validation::prevalidate(request.action.as_str(), &request)?;
801
802        let mutates = is_mutating_action(request.action.as_str());
803        let result = match request.action.as_str() {
804            "AddTagsToResource" => self.add_tags_to_resource(&request),
805            "CreateDBInstance" => self.create_db_instance(&request).await,
806            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
807            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
808            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
809            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
810            "DeleteDBInstance" => self.delete_db_instance(&request).await,
811            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
812            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
813            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
814            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
815            "DescribeDBInstances" => self.describe_db_instances(&request),
816            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
817            "DescribeDBParameters" => self.describe_db_parameters_real(&request),
818            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
819            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
820            "DescribeOrderableDBInstanceOptions" => {
821                self.describe_orderable_db_instance_options(&request)
822            }
823            "ListTagsForResource" => self.list_tags_for_resource(&request),
824            "ModifyDBInstance" => self.modify_db_instance(&request),
825            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
826            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
827            "RebootDBInstance" => self.reboot_db_instance(&request).await,
828            "StartDBInstance" => self.start_db_instance(&request).await,
829            "StopDBInstance" => self.stop_db_instance(&request).await,
830            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
831            "RestoreDBInstanceFromDBSnapshot" => {
832                self.restore_db_instance_from_db_snapshot(&request).await
833            }
834            "RestoreDBInstanceToPointInTime" => {
835                self.restore_db_instance_to_point_in_time(&request).await
836            }
837            "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
838            "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
839            "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
840            "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
841            "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
842            "RestoreDBClusterToPointInTime" => {
843                self.restore_db_cluster_to_point_in_time(&request).await
844            }
845            _ => self.handle_extra_action(&request),
846        };
847        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
848            self.save_snapshot().await;
849        }
850        result
851    }
852
853    fn supported_actions(&self) -> &[&str] {
854        SUPPORTED_ACTIONS
855    }
856}
857
858impl RdsService {}
859
860/// Render a single user-set parameter as the XML shape AWS emits inside
861/// `DescribeDB(Cluster)Parameters` responses. We don't store metadata
862/// alongside user values so we report `dynamic`/`string` defaults.
863pub(crate) fn render_user_parameter_xml(name: &str, value: &str, apply_method: &str) -> String {
864    format!(
865        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>user</Source>\n        <ApplyType>dynamic</ApplyType>\n        <ApplyMethod>{}</ApplyMethod>\n        <DataType>string</DataType>\n        <IsModifiable>true</IsModifiable>\n      </Parameter>\n",
866        xml_escape(name),
867        xml_escape(value),
868        xml_escape(apply_method),
869    )
870}
871
872/// Render a single engine-default parameter as the XML shape AWS emits
873/// inside `DescribeDB(Cluster)Parameters` and
874/// `DescribeEngineDefault(Cluster)Parameters` responses.
875pub(crate) fn render_engine_default_parameter_xml(
876    default: &crate::state::EngineDefaultParameter,
877) -> String {
878    format!(
879        "      <Parameter>\n        <ParameterName>{}</ParameterName>\n        <ParameterValue>{}</ParameterValue>\n        <Source>engine-default</Source>\n        <ApplyType>{}</ApplyType>\n        <DataType>{}</DataType>\n        <AllowedValues>{}</AllowedValues>\n        <IsModifiable>{}</IsModifiable>\n      </Parameter>\n",
880        xml_escape(default.name),
881        xml_escape(default.value),
882        xml_escape(default.apply_type),
883        xml_escape(default.data_type),
884        xml_escape(default.allowed_values),
885        default.is_modifiable,
886    )
887}
888
889/// Parse `Parameters.{Parameter|member}.N.{ParameterName,ParameterValue,ApplyMethod}`
890/// from a Query-protocol request. AWS RDS uses `Parameters.Parameter.N`
891/// (the `Parameter` list location name from the Smithy model); we also
892/// accept the generic `Parameters.member.N` form so hand-built clients
893/// using the default Query list shape keep working. Skips members
894/// missing a name or value. `ApplyMethod` defaults to `immediate` (AWS's
895/// default) and is preserved so a `Describe` round-trip echoes it back —
896/// the Terraform provider sets `apply_method = "immediate"` by default and
897/// drifts if the read-back omits it.
898pub(crate) struct DbParameterInput {
899    pub name: String,
900    pub value: String,
901    pub apply_method: String,
902}
903
904pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<DbParameterInput> {
905    let mut out = Vec::new();
906    for prefix in ["Parameters.Parameter", "Parameters.member"] {
907        let mut index = 1;
908        loop {
909            let name_key = format!("{prefix}.{index}.ParameterName");
910            let value_key = format!("{prefix}.{index}.ParameterValue");
911            let apply_key = format!("{prefix}.{index}.ApplyMethod");
912            let name = optional_query_param(request, &name_key);
913            let value = optional_query_param(request, &value_key);
914            if name.is_none() && value.is_none() {
915                break;
916            }
917            if let (Some(n), Some(v)) = (name, value) {
918                if !n.is_empty() {
919                    let apply_method = optional_query_param(request, &apply_key)
920                        .filter(|m| !m.is_empty())
921                        .unwrap_or_else(|| "immediate".to_string());
922                    out.push(DbParameterInput {
923                        name: n,
924                        value: v,
925                        apply_method,
926                    });
927                }
928            }
929            index += 1;
930        }
931    }
932    out
933}
934
935/// Resolve an AWS-shaped log file name (e.g. `error/postgres.log`) to
936/// the absolute path inside the running container. Unknown names fall
937/// through as-is so callers can also fetch arbitrary paths.
938fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
939    match (engine, log_file_name) {
940        (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
941        (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
942        ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
943        ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
944            "/var/log/mysql/slow.log".to_string()
945        }
946        _ => log_file_name.to_string(),
947    }
948}
949
950pub(crate) struct PaginationResult<T> {
951    items: Vec<T>,
952    next_marker: Option<String>,
953}
954
955/// Attach `instance_id` to the cluster's `DBClusterMembers` array,
956/// promoting it to writer when the cluster has none. Idempotent:
957/// re-attaching an existing member is a no-op.
958fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
959    use serde_json::{json, Value};
960    let Some(map) = state.extras.get_mut("clusters") else {
961        return;
962    };
963    let Some(entry) = map.get_mut(cluster_id) else {
964        return;
965    };
966    let Some(obj) = entry.as_object_mut() else {
967        return;
968    };
969    let mut members: Vec<Value> = obj
970        .get("DBClusterMembers")
971        .and_then(|v| v.as_array())
972        .cloned()
973        .unwrap_or_default();
974    if members
975        .iter()
976        .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
977    {
978        return;
979    }
980    let has_writer = members
981        .iter()
982        .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
983    let promotion_tier = (members.len() as i64) + 1;
984    members.push(json!({
985        "DBInstanceIdentifier": instance_id,
986        "IsClusterWriter": !has_writer,
987        "DBClusterParameterGroupStatus": "in-sync",
988        "PromotionTier": promotion_tier,
989    }));
990    obj.insert("DBClusterMembers".to_string(), Value::Array(members));
991    if !has_writer {
992        obj.insert(
993            "WriterDBInstanceIdentifier".to_string(),
994            Value::String(instance_id.to_string()),
995        );
996    }
997}
998
999#[path = "../service_helpers.rs"]
1000mod service_helpers;
1001pub(crate) use service_helpers::*;
1002
1003#[cfg(test)]
1004#[path = "../service_tests.rs"]
1005mod tests;