1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 tags: Vec<crate::state::RdsTag>,
365 }
366
367 let pending: Vec<Pending> = {
368 let mut accounts = self.state.write();
369 let mut out = Vec::new();
370 for (_, state) in accounts.iter_mut() {
371 let account_id = state.account_id.clone();
372 let region = state.region.clone();
373 for (id, inst) in state.instances.iter_mut() {
374 if !matches!(
382 inst.db_instance_status.as_str(),
383 "creating"
384 | "available"
385 | "starting"
386 | "modifying"
387 | "rebooting"
388 | "backing-up"
389 ) {
390 continue;
391 }
392 inst.db_instance_status = "starting".to_string();
393 out.push(Pending {
394 account_id: account_id.clone(),
395 region: region.clone(),
396 id: id.clone(),
397 arn: inst.db_instance_arn.clone(),
398 engine: inst.engine.clone(),
399 engine_version: inst.engine_version.clone(),
400 username: inst.master_username.clone(),
401 password: inst.master_user_password.clone(),
402 db_name: inst
403 .db_name
404 .clone()
405 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406 tags: inst.tags.clone(),
407 });
408 }
409 }
410 out
411 };
412
413 if pending.is_empty() {
414 return;
415 }
416 tracing::info!(
417 count = pending.len(),
418 "recovering backing containers for persisted rds instances",
419 );
420
421 for p in pending {
422 let runtime = runtime.clone();
423 let state = self.state.clone();
424 let snapshot_store = self.snapshot_store.clone();
425 let snapshot_lock = self.snapshot_lock.clone();
426 let delivery_bus = self.delivery_bus.clone();
427 tokio::spawn(async move {
428 match runtime
429 .ensure_postgres(
430 &p.id,
431 &p.engine,
432 &p.engine_version,
433 &p.username,
434 &p.password,
435 &p.db_name,
436 &p.account_id,
437 &p.region,
438 &p.tags,
439 )
440 .await
441 {
442 Ok(running) => {
443 {
444 let mut accounts = state.write();
445 if let Some(s) = accounts.get_mut(&p.account_id) {
446 if let Some(inst) = s.instances.get_mut(&p.id) {
447 inst.db_instance_status = "available".to_string();
448 inst.endpoint_address = running.endpoint_address.clone();
449 inst.port = i32::from(running.endpoint_port);
450 inst.host_port = running.host_port;
451 inst.container_id = running.container_id;
452 }
453 }
454 }
455 save_snapshot_static(
456 state.clone(),
457 snapshot_store.clone(),
458 snapshot_lock.clone(),
459 )
460 .await;
461 emit_event_static(
462 delivery_bus.as_ref(),
463 RdsSourceType::DbInstance,
464 &p.id,
465 &p.arn,
466 "RDS-EVENT-0088",
467 &["notification"],
468 "DB instance restarted after fakecloud restart",
469 );
470 }
471 Err(error) => {
472 tracing::error!(
473 %error,
474 db_instance_identifier = %p.id,
475 "failed to recover rds backing container after restart",
476 );
477 {
478 let mut accounts = state.write();
479 if let Some(s) = accounts.get_mut(&p.account_id) {
480 if let Some(inst) = s.instances.get_mut(&p.id) {
481 inst.db_instance_status = "failed".to_string();
482 }
483 }
484 }
485 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486 }
487 }
488 });
489 }
490 }
491
492 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500 let arn = {
505 let accounts = self.state.read();
506 let empty = RdsState::new(&request.account_id, &request.region);
507 let state = accounts.get(&request.account_id).unwrap_or(&empty);
508 state
509 .instances
510 .get(&db_instance_identifier)
511 .map(|i| i.db_instance_arn.clone())
512 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513 };
514
515 if let Some(runtime) = self.runtime.as_ref() {
516 runtime.stop_container(&db_instance_identifier).await;
517 }
518
519 let instance = {
520 let mut accounts = self.state.write();
521 let state = accounts.get_or_create(&request.account_id);
522 let inst = state
523 .instances
524 .get_mut(&db_instance_identifier)
525 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526 inst.db_instance_status = "stopped".to_string();
527 inst.container_id = String::new();
528 inst.clone()
529 };
530
531 self.emit_event(
532 RdsSourceType::DbInstance,
533 &db_instance_identifier,
534 &arn,
535 "RDS-EVENT-0089",
536 &["notification"],
537 "DB instance stopped",
538 );
539
540 Ok(AwsResponse::xml(
541 StatusCode::OK,
542 query_response_xml(
543 "StopDBInstance",
544 RDS_NS,
545 &format!(
546 "<DBInstance>{}</DBInstance>",
547 db_instance_xml(&instance, Some("stopped"))
548 ),
549 &request.request_id,
550 ),
551 ))
552 }
553
554 async fn start_db_instance(
557 &self,
558 request: &AwsRequest,
559 ) -> Result<AwsResponse, AwsServiceError> {
560 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562 let instance = {
566 let accounts = self.state.read();
567 let empty = RdsState::new(&request.account_id, &request.region);
568 let state = accounts.get(&request.account_id).unwrap_or(&empty);
569 state
570 .instances
571 .get(&db_instance_identifier)
572 .cloned()
573 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574 };
575
576 {
579 let mut accounts = self.state.write();
580 let state = accounts.get_or_create(&request.account_id);
581 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582 inst.db_instance_status = "starting".to_string();
583 }
584 }
585
586 let Some(runtime) = self.runtime.clone() else {
591 {
592 let mut accounts = self.state.write();
593 let state = accounts.get_or_create(&request.account_id);
594 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
595 inst.db_instance_status = "stopped".to_string();
596 }
597 }
598 return Err(AwsServiceError::aws_error(
599 StatusCode::SERVICE_UNAVAILABLE,
600 "InternalFailure",
601 "Container runtime is not configured; cannot start DB instance",
602 ));
603 };
604
605 {
612 let state_handle = self.state.clone();
613 let delivery_bus = self.delivery_bus.clone();
614 let snapshot_store = self.snapshot_store.clone();
615 let snapshot_lock = self.snapshot_lock.clone();
616 let id = db_instance_identifier.clone();
617 let account_id = request.account_id.clone();
618 let region = request.region.clone();
619 let inst = instance.clone();
620 tokio::spawn(async move {
621 let logical_db = inst
622 .db_name
623 .clone()
624 .unwrap_or_else(|| default_db_name(&inst.engine).to_string());
625 match runtime
626 .ensure_postgres(
627 &id,
628 &inst.engine,
629 &inst.engine_version,
630 &inst.master_username,
631 &inst.master_user_password,
632 &logical_db,
633 &account_id,
634 ®ion,
635 &inst.tags,
636 )
637 .await
638 {
639 Ok(r) => {
640 let arn = {
641 let mut accounts = state_handle.write();
642 let state = accounts.get_or_create(&account_id);
643 let Some(inst) = state.instances.get_mut(&id) else {
644 return;
645 };
646 inst.db_instance_status = "available".to_string();
647 inst.endpoint_address = r.endpoint_address.clone();
648 inst.port = i32::from(r.endpoint_port);
649 inst.host_port = r.host_port;
650 inst.container_id = r.container_id;
651 inst.db_instance_arn.clone()
652 };
653 emit_event_static_with_state(
654 delivery_bus.as_ref(),
655 Some(&state_handle),
656 Some(&account_id),
657 RdsSourceType::DbInstance,
658 &id,
659 &arn,
660 "RDS-EVENT-0088",
661 &["notification"],
662 "DB instance started",
663 );
664 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock)
665 .await;
666 }
667 Err(_) => {
668 let mut accounts = state_handle.write();
671 let state = accounts.get_or_create(&account_id);
672 if let Some(inst) = state.instances.get_mut(&id) {
673 inst.db_instance_status = "stopped".to_string();
674 }
675 }
676 }
677 });
678 }
679
680 Ok(AwsResponse::xml(
681 StatusCode::OK,
682 query_response_xml(
683 "StartDBInstance",
684 RDS_NS,
685 &format!(
686 "<DBInstance>{}</DBInstance>",
687 db_instance_xml(&instance, Some("starting"))
688 ),
689 &request.request_id,
690 ),
691 ))
692 }
693}
694
695fn is_declared_add_tags_not_found(code: &str) -> bool {
703 matches!(
704 code,
705 "BlueGreenDeploymentNotFoundFault"
706 | "DBClusterNotFoundFault"
707 | "DBInstanceNotFound"
708 | "DBProxyEndpointNotFoundFault"
709 | "DBProxyNotFoundFault"
710 | "DBProxyTargetGroupNotFoundFault"
711 | "DBShardGroupNotFound"
712 | "DBSnapshotNotFound"
713 | "DBSnapshotTenantDatabaseNotFoundFault"
714 | "IntegrationNotFoundFault"
715 | "InvalidDBClusterEndpointStateFault"
716 | "InvalidDBClusterStateFault"
717 | "InvalidDBInstanceState"
718 | "TenantDatabaseNotFound"
719 )
720}
721
722async fn save_snapshot_static(
724 state: SharedRdsState,
725 store: Option<Arc<dyn SnapshotStore>>,
726 lock: Arc<AsyncMutex<()>>,
727) {
728 let Some(store) = store else {
729 return;
730 };
731 let _guard = lock.lock().await;
732 let snapshot = RdsSnapshot {
733 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
734 state: None,
735 accounts: Some(state.read().clone()),
736 };
737 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
738 let bytes = serde_json::to_vec(&snapshot)
739 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
740 store.save(&bytes)
741 })
742 .await;
743 match join {
744 Ok(Ok(())) => {}
745 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
746 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
747 }
748}
749
750impl RdsService {
751 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
762 self.runtime.as_ref().ok_or_else(|| {
763 AwsServiceError::aws_error(
764 StatusCode::SERVICE_UNAVAILABLE,
765 "InsufficientDBInstanceCapacity",
766 "Docker/Podman is required for RDS DB instances but is not available",
767 )
768 })
769 }
770
771 #[allow(clippy::too_many_arguments)]
781 fn spawn_finalize_restored_instance(
782 &self,
783 runtime: Arc<RdsRuntime>,
784 account_id: String,
785 region: String,
786 id: String,
787 arn: String,
788 engine: String,
789 engine_version: String,
790 master_username: String,
791 master_user_password: String,
792 logical_db: String,
793 tags: Vec<RdsTag>,
794 dump: Option<Vec<u8>>,
795 created_event: (&'static str, &'static str),
796 ) {
797 let state_handle = self.state.clone();
798 let delivery_bus = self.delivery_bus.clone();
799 let snapshot_store = self.snapshot_store.clone();
800 let snapshot_lock = self.snapshot_lock.clone();
801 let (event_id, event_message) = created_event;
802 async fn fail(
805 state_handle: &SharedRdsState,
806 snapshot_store: Option<Arc<dyn SnapshotStore>>,
807 snapshot_lock: Arc<AsyncMutex<()>>,
808 delivery_bus: Option<&Arc<DeliveryBus>>,
809 runtime: &Arc<RdsRuntime>,
810 account_id: &str,
811 id: &str,
812 arn: &str,
813 error: &str,
814 ) {
815 tracing::error!(%error, db_instance_identifier=%id, "restore/replica background finalize failed");
816 {
817 let mut accounts = state_handle.write();
818 let state = accounts.get_or_create(account_id);
819 state.instances.remove(id);
820 for inst in state.instances.values_mut() {
824 inst.read_replica_db_instance_identifiers
825 .retain(|r| r != id);
826 }
827 }
828 runtime.stop_container(id).await;
829 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
830 emit_event_static(
831 delivery_bus,
832 RdsSourceType::DbInstance,
833 id,
834 arn,
835 "RDS-EVENT-0058",
836 &["failure"],
837 &format!("DB instance failed to create: {error}"),
838 );
839 }
840 tokio::spawn(async move {
841 let running = match runtime
842 .ensure_postgres(
843 &id,
844 &engine,
845 &engine_version,
846 &master_username,
847 &master_user_password,
848 &logical_db,
849 &account_id,
850 ®ion,
851 &tags,
852 )
853 .await
854 {
855 Ok(running) => running,
856 Err(error) => {
857 fail(
858 &state_handle,
859 snapshot_store,
860 snapshot_lock,
861 delivery_bus.as_ref(),
862 &runtime,
863 &account_id,
864 &id,
865 &arn,
866 &error.to_string(),
867 )
868 .await;
869 return;
870 }
871 };
872
873 if let Some(dump) = dump {
874 if let Err(error) = runtime
875 .restore_database(
876 &id,
877 &engine,
878 &master_username,
879 &master_user_password,
880 &logical_db,
881 &dump,
882 )
883 .await
884 {
885 fail(
888 &state_handle,
889 snapshot_store,
890 snapshot_lock,
891 delivery_bus.as_ref(),
892 &runtime,
893 &account_id,
894 &id,
895 &arn,
896 &error.to_string(),
897 )
898 .await;
899 return;
900 }
901 }
902
903 let instance_present = {
904 let mut accounts = state_handle.write();
905 let state = accounts.get_or_create(&account_id);
906 if let Some(inst) = state.instances.get_mut(&id) {
907 inst.db_instance_status = "available".to_string();
908 inst.endpoint_address = running.endpoint_address.clone();
909 inst.port = i32::from(running.endpoint_port);
910 inst.host_port = running.host_port;
911 inst.container_id = running.container_id.clone();
912 true
913 } else {
914 false
915 }
916 };
917 if !instance_present {
918 runtime.stop_container(&id).await;
920 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
921 return;
922 }
923 emit_event_static_with_state(
924 delivery_bus.as_ref(),
925 Some(&state_handle),
926 Some(&account_id),
927 RdsSourceType::DbInstance,
928 &id,
929 &arn,
930 event_id,
931 &["creation"],
932 event_message,
933 );
934 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
935 });
936 }
937
938 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
942 let store = self.snapshot_store.clone()?;
943 let state = self.state.clone();
944 let lock = self.snapshot_lock.clone();
945 Some(Arc::new(move || {
946 let state = state.clone();
947 let store = store.clone();
948 let lock = lock.clone();
949 Box::pin(async move {
950 save_snapshot_static(state, Some(store), lock).await;
951 })
952 }))
953 }
954}
955
956#[async_trait]
957impl AwsService for RdsService {
958 fn service_name(&self) -> &str {
959 "rds"
960 }
961
962 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
963 crate::validation::prevalidate(request.action.as_str(), &request)?;
968
969 let mutates = is_mutating_action(request.action.as_str());
970 let result = match request.action.as_str() {
971 "AddTagsToResource" => self.add_tags_to_resource(&request),
972 "CreateDBInstance" => self.create_db_instance(&request).await,
973 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
974 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
975 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
976 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
977 "DeleteDBInstance" => self.delete_db_instance(&request).await,
978 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
979 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
980 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
981 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
982 "DescribeDBInstances" => self.describe_db_instances(&request),
983 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
984 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
985 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
986 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
987 "DescribeOrderableDBInstanceOptions" => {
988 self.describe_orderable_db_instance_options(&request)
989 }
990 "ListTagsForResource" => self.list_tags_for_resource(&request),
991 "ModifyDBInstance" => self.modify_db_instance(&request),
992 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
993 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
994 "RebootDBInstance" => self.reboot_db_instance(&request).await,
995 "StartDBInstance" => self.start_db_instance(&request).await,
996 "StopDBInstance" => self.stop_db_instance(&request).await,
997 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
998 "RestoreDBInstanceFromDBSnapshot" => {
999 self.restore_db_instance_from_db_snapshot(&request).await
1000 }
1001 "RestoreDBInstanceToPointInTime" => {
1002 self.restore_db_instance_to_point_in_time(&request).await
1003 }
1004 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
1005 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
1006 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
1007 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
1008 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
1009 "RestoreDBClusterToPointInTime" => {
1010 self.restore_db_cluster_to_point_in_time(&request).await
1011 }
1012 _ => self.handle_extra_action(&request),
1013 };
1014 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1015 self.save_snapshot().await;
1016 }
1017 result
1018 }
1019
1020 fn supported_actions(&self) -> &[&str] {
1021 SUPPORTED_ACTIONS
1022 }
1023}
1024
1025impl RdsService {}
1026
1027pub(crate) fn render_user_parameter_xml(name: &str, value: &str, apply_method: &str) -> String {
1031 format!(
1032 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <ApplyMethod>{}</ApplyMethod>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
1033 xml_escape(name),
1034 xml_escape(value),
1035 xml_escape(apply_method),
1036 )
1037}
1038
1039pub(crate) fn render_engine_default_parameter_xml(
1043 default: &crate::state::EngineDefaultParameter,
1044) -> String {
1045 format!(
1046 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
1047 xml_escape(default.name),
1048 xml_escape(default.value),
1049 xml_escape(default.apply_type),
1050 xml_escape(default.data_type),
1051 xml_escape(default.allowed_values),
1052 default.is_modifiable,
1053 )
1054}
1055
1056pub(crate) struct DbParameterInput {
1066 pub name: String,
1067 pub value: String,
1068 pub apply_method: String,
1069}
1070
1071pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<DbParameterInput> {
1072 let mut out = Vec::new();
1073 for prefix in ["Parameters.Parameter", "Parameters.member"] {
1074 let mut index = 1;
1075 loop {
1076 let name_key = format!("{prefix}.{index}.ParameterName");
1077 let value_key = format!("{prefix}.{index}.ParameterValue");
1078 let apply_key = format!("{prefix}.{index}.ApplyMethod");
1079 let name = optional_query_param(request, &name_key);
1080 let value = optional_query_param(request, &value_key);
1081 if name.is_none() && value.is_none() {
1082 break;
1083 }
1084 if let (Some(n), Some(v)) = (name, value) {
1085 if !n.is_empty() {
1086 let apply_method = optional_query_param(request, &apply_key)
1087 .filter(|m| !m.is_empty())
1088 .unwrap_or_else(|| "immediate".to_string());
1089 out.push(DbParameterInput {
1090 name: n,
1091 value: v,
1092 apply_method,
1093 });
1094 }
1095 }
1096 index += 1;
1097 }
1098 }
1099 out
1100}
1101
1102fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
1106 match (engine, log_file_name) {
1107 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
1108 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
1109 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
1110 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
1111 "/var/log/mysql/slow.log".to_string()
1112 }
1113 _ => log_file_name.to_string(),
1114 }
1115}
1116
1117pub(crate) struct PaginationResult<T> {
1118 items: Vec<T>,
1119 next_marker: Option<String>,
1120}
1121
1122fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
1126 use serde_json::{json, Value};
1127 let Some(map) = state.extras.get_mut("clusters") else {
1128 return;
1129 };
1130 let Some(entry) = map.get_mut(cluster_id) else {
1131 return;
1132 };
1133 let Some(obj) = entry.as_object_mut() else {
1134 return;
1135 };
1136 let mut members: Vec<Value> = obj
1137 .get("DBClusterMembers")
1138 .and_then(|v| v.as_array())
1139 .cloned()
1140 .unwrap_or_default();
1141 if members
1142 .iter()
1143 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
1144 {
1145 return;
1146 }
1147 let has_writer = members
1148 .iter()
1149 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
1150 let promotion_tier = (members.len() as i64) + 1;
1151 members.push(json!({
1152 "DBInstanceIdentifier": instance_id,
1153 "IsClusterWriter": !has_writer,
1154 "DBClusterParameterGroupStatus": "in-sync",
1155 "PromotionTier": promotion_tier,
1156 }));
1157 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
1158 if !has_writer {
1159 obj.insert(
1160 "WriterDBInstanceIdentifier".to_string(),
1161 Value::String(instance_id.to_string()),
1162 );
1163 }
1164}
1165
1166#[path = "../service_helpers.rs"]
1167pub(crate) mod service_helpers;
1168pub(crate) use service_helpers::*;
1169
1170#[cfg(test)]
1171#[path = "../service_tests.rs"]
1172mod tests;