1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 tags: Vec<crate::state::RdsTag>,
365 }
366
367 let pending: Vec<Pending> = {
368 let mut accounts = self.state.write();
369 let mut out = Vec::new();
370 for (_, state) in accounts.iter_mut() {
371 let account_id = state.account_id.clone();
372 let region = state.region.clone();
373 for (id, inst) in state.instances.iter_mut() {
374 if !matches!(
382 inst.db_instance_status.as_str(),
383 "creating"
384 | "available"
385 | "starting"
386 | "modifying"
387 | "rebooting"
388 | "backing-up"
389 ) {
390 continue;
391 }
392 inst.db_instance_status = "starting".to_string();
393 out.push(Pending {
394 account_id: account_id.clone(),
395 region: region.clone(),
396 id: id.clone(),
397 arn: inst.db_instance_arn.clone(),
398 engine: inst.engine.clone(),
399 engine_version: inst.engine_version.clone(),
400 username: inst.master_username.clone(),
401 password: inst.master_user_password.clone(),
402 db_name: inst
403 .db_name
404 .clone()
405 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406 tags: inst.tags.clone(),
407 });
408 }
409 }
410 out
411 };
412
413 if pending.is_empty() {
414 return;
415 }
416 tracing::info!(
417 count = pending.len(),
418 "recovering backing containers for persisted rds instances",
419 );
420
421 for p in pending {
422 let runtime = runtime.clone();
423 let state = self.state.clone();
424 let snapshot_store = self.snapshot_store.clone();
425 let snapshot_lock = self.snapshot_lock.clone();
426 let delivery_bus = self.delivery_bus.clone();
427 tokio::spawn(async move {
428 match runtime
429 .ensure_postgres(
430 &p.id,
431 &p.engine,
432 &p.engine_version,
433 &p.username,
434 &p.password,
435 &p.db_name,
436 &p.account_id,
437 &p.region,
438 &p.tags,
439 )
440 .await
441 {
442 Ok(running) => {
443 {
444 let mut accounts = state.write();
445 if let Some(s) = accounts.get_mut(&p.account_id) {
446 if let Some(inst) = s.instances.get_mut(&p.id) {
447 inst.db_instance_status = "available".to_string();
448 inst.endpoint_address = running.endpoint_address.clone();
449 inst.port = i32::from(running.endpoint_port);
450 inst.host_port = running.host_port;
451 inst.container_id = running.container_id;
452 }
453 }
454 }
455 save_snapshot_static(
456 state.clone(),
457 snapshot_store.clone(),
458 snapshot_lock.clone(),
459 )
460 .await;
461 emit_event_static(
462 delivery_bus.as_ref(),
463 RdsSourceType::DbInstance,
464 &p.id,
465 &p.arn,
466 "RDS-EVENT-0088",
467 &["notification"],
468 "DB instance restarted after fakecloud restart",
469 );
470 }
471 Err(error) => {
472 tracing::error!(
473 %error,
474 db_instance_identifier = %p.id,
475 "failed to recover rds backing container after restart",
476 );
477 {
478 let mut accounts = state.write();
479 if let Some(s) = accounts.get_mut(&p.account_id) {
480 if let Some(inst) = s.instances.get_mut(&p.id) {
481 inst.db_instance_status = "failed".to_string();
482 }
483 }
484 }
485 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486 }
487 }
488 });
489 }
490 }
491
492 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500 let arn = {
505 let accounts = self.state.read();
506 let empty = RdsState::new(&request.account_id, &request.region);
507 let state = accounts.get(&request.account_id).unwrap_or(&empty);
508 state
509 .instances
510 .get(&db_instance_identifier)
511 .map(|i| i.db_instance_arn.clone())
512 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513 };
514
515 if let Some(runtime) = self.runtime.as_ref() {
516 runtime.stop_container(&db_instance_identifier).await;
517 }
518
519 let instance = {
520 let mut accounts = self.state.write();
521 let state = accounts.get_or_create(&request.account_id);
522 let inst = state
523 .instances
524 .get_mut(&db_instance_identifier)
525 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526 inst.db_instance_status = "stopped".to_string();
527 inst.container_id = String::new();
528 inst.clone()
529 };
530
531 self.emit_event(
532 RdsSourceType::DbInstance,
533 &db_instance_identifier,
534 &arn,
535 "RDS-EVENT-0089",
536 &["notification"],
537 "DB instance stopped",
538 );
539
540 Ok(AwsResponse::xml(
541 StatusCode::OK,
542 query_response_xml(
543 "StopDBInstance",
544 RDS_NS,
545 &format!(
546 "<DBInstance>{}</DBInstance>",
547 db_instance_xml(&instance, Some("stopped"))
548 ),
549 &request.request_id,
550 ),
551 ))
552 }
553
554 async fn start_db_instance(
557 &self,
558 request: &AwsRequest,
559 ) -> Result<AwsResponse, AwsServiceError> {
560 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562 let instance = {
566 let accounts = self.state.read();
567 let empty = RdsState::new(&request.account_id, &request.region);
568 let state = accounts.get(&request.account_id).unwrap_or(&empty);
569 state
570 .instances
571 .get(&db_instance_identifier)
572 .cloned()
573 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574 };
575
576 {
579 let mut accounts = self.state.write();
580 let state = accounts.get_or_create(&request.account_id);
581 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582 inst.db_instance_status = "starting".to_string();
583 }
584 }
585
586 let Some(runtime) = self.runtime.clone() else {
591 {
592 let mut accounts = self.state.write();
593 let state = accounts.get_or_create(&request.account_id);
594 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
595 inst.db_instance_status = "stopped".to_string();
596 }
597 }
598 return Err(AwsServiceError::aws_error(
599 StatusCode::SERVICE_UNAVAILABLE,
600 "InternalFailure",
601 "Container runtime is not configured; cannot start DB instance",
602 ));
603 };
604
605 {
612 let state_handle = self.state.clone();
613 let delivery_bus = self.delivery_bus.clone();
614 let snapshot_store = self.snapshot_store.clone();
615 let snapshot_lock = self.snapshot_lock.clone();
616 let id = db_instance_identifier.clone();
617 let account_id = request.account_id.clone();
618 let region = request.region.clone();
619 let inst = instance.clone();
620 tokio::spawn(async move {
621 let logical_db = inst
622 .db_name
623 .clone()
624 .unwrap_or_else(|| default_db_name(&inst.engine).to_string());
625 match runtime
626 .ensure_postgres(
627 &id,
628 &inst.engine,
629 &inst.engine_version,
630 &inst.master_username,
631 &inst.master_user_password,
632 &logical_db,
633 &account_id,
634 ®ion,
635 &inst.tags,
636 )
637 .await
638 {
639 Ok(r) => {
640 let arn = {
641 let mut accounts = state_handle.write();
642 let state = accounts.get_or_create(&account_id);
643 let Some(inst) = state.instances.get_mut(&id) else {
644 return;
645 };
646 inst.db_instance_status = "available".to_string();
647 inst.endpoint_address = r.endpoint_address.clone();
648 inst.port = i32::from(r.endpoint_port);
649 inst.host_port = r.host_port;
650 inst.container_id = r.container_id;
651 inst.db_instance_arn.clone()
652 };
653 emit_event_static_with_state(
654 delivery_bus.as_ref(),
655 Some(&state_handle),
656 Some(&account_id),
657 RdsSourceType::DbInstance,
658 &id,
659 &arn,
660 "RDS-EVENT-0088",
661 &["notification"],
662 "DB instance started",
663 );
664 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock)
665 .await;
666 }
667 Err(_) => {
668 let mut accounts = state_handle.write();
671 let state = accounts.get_or_create(&account_id);
672 if let Some(inst) = state.instances.get_mut(&id) {
673 inst.db_instance_status = "stopped".to_string();
674 }
675 }
676 }
677 });
678 }
679
680 Ok(AwsResponse::xml(
681 StatusCode::OK,
682 query_response_xml(
683 "StartDBInstance",
684 RDS_NS,
685 &format!(
686 "<DBInstance>{}</DBInstance>",
687 db_instance_xml(&instance, Some("starting"))
688 ),
689 &request.request_id,
690 ),
691 ))
692 }
693}
694
695fn is_declared_add_tags_not_found(code: &str) -> bool {
703 matches!(
704 code,
705 "BlueGreenDeploymentNotFoundFault"
706 | "DBClusterNotFoundFault"
707 | "DBInstanceNotFound"
708 | "DBProxyEndpointNotFoundFault"
709 | "DBProxyNotFoundFault"
710 | "DBProxyTargetGroupNotFoundFault"
711 | "DBShardGroupNotFound"
712 | "DBSnapshotNotFound"
713 | "DBSnapshotTenantDatabaseNotFoundFault"
714 | "IntegrationNotFoundFault"
715 | "InvalidDBClusterEndpointStateFault"
716 | "InvalidDBClusterStateFault"
717 | "InvalidDBInstanceState"
718 | "TenantDatabaseNotFound"
719 )
720}
721
722async fn save_snapshot_static(
724 state: SharedRdsState,
725 store: Option<Arc<dyn SnapshotStore>>,
726 lock: Arc<AsyncMutex<()>>,
727) {
728 let Some(store) = store else {
729 return;
730 };
731 let _guard = lock.lock().await;
732 let snapshot = RdsSnapshot {
733 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
734 state: None,
735 accounts: Some(state.read().clone()),
736 };
737 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
738 let bytes = serde_json::to_vec(&snapshot)
739 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
740 store.save(&bytes)
741 })
742 .await;
743 match join {
744 Ok(Ok(())) => {}
745 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
746 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
747 }
748}
749
750impl RdsService {
751 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
762 self.runtime.as_ref().ok_or_else(|| {
763 AwsServiceError::aws_error(
764 StatusCode::SERVICE_UNAVAILABLE,
765 "InsufficientDBInstanceCapacity",
766 format!(
767 "Docker/Podman is required for RDS DB instances but is not available. {}",
768 fakecloud_core::container_net::CONTAINER_RUNTIME_HINT
769 ),
770 )
771 })
772 }
773
774 #[allow(clippy::too_many_arguments)]
784 fn spawn_finalize_restored_instance(
785 &self,
786 runtime: Arc<RdsRuntime>,
787 account_id: String,
788 region: String,
789 id: String,
790 arn: String,
791 engine: String,
792 engine_version: String,
793 master_username: String,
794 master_user_password: String,
795 logical_db: String,
796 tags: Vec<RdsTag>,
797 dump: Option<Vec<u8>>,
798 created_event: (&'static str, &'static str),
799 ) {
800 let state_handle = self.state.clone();
801 let delivery_bus = self.delivery_bus.clone();
802 let snapshot_store = self.snapshot_store.clone();
803 let snapshot_lock = self.snapshot_lock.clone();
804 let (event_id, event_message) = created_event;
805 async fn fail(
808 state_handle: &SharedRdsState,
809 snapshot_store: Option<Arc<dyn SnapshotStore>>,
810 snapshot_lock: Arc<AsyncMutex<()>>,
811 delivery_bus: Option<&Arc<DeliveryBus>>,
812 runtime: &Arc<RdsRuntime>,
813 account_id: &str,
814 id: &str,
815 arn: &str,
816 error: &str,
817 ) {
818 tracing::error!(%error, db_instance_identifier=%id, "restore/replica background finalize failed");
819 {
820 let mut accounts = state_handle.write();
821 let state = accounts.get_or_create(account_id);
822 state.instances.remove(id);
823 for inst in state.instances.values_mut() {
827 inst.read_replica_db_instance_identifiers
828 .retain(|r| r != id);
829 }
830 }
831 runtime.stop_container(id).await;
832 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
833 emit_event_static(
834 delivery_bus,
835 RdsSourceType::DbInstance,
836 id,
837 arn,
838 "RDS-EVENT-0058",
839 &["failure"],
840 &format!("DB instance failed to create: {error}"),
841 );
842 }
843 tokio::spawn(async move {
844 let running = match runtime
845 .ensure_postgres(
846 &id,
847 &engine,
848 &engine_version,
849 &master_username,
850 &master_user_password,
851 &logical_db,
852 &account_id,
853 ®ion,
854 &tags,
855 )
856 .await
857 {
858 Ok(running) => running,
859 Err(error) => {
860 fail(
861 &state_handle,
862 snapshot_store,
863 snapshot_lock,
864 delivery_bus.as_ref(),
865 &runtime,
866 &account_id,
867 &id,
868 &arn,
869 &error.to_string(),
870 )
871 .await;
872 return;
873 }
874 };
875
876 if let Some(dump) = dump {
877 if let Err(error) = runtime
878 .restore_database(
879 &id,
880 &engine,
881 &master_username,
882 &master_user_password,
883 &logical_db,
884 &dump,
885 )
886 .await
887 {
888 fail(
891 &state_handle,
892 snapshot_store,
893 snapshot_lock,
894 delivery_bus.as_ref(),
895 &runtime,
896 &account_id,
897 &id,
898 &arn,
899 &error.to_string(),
900 )
901 .await;
902 return;
903 }
904 }
905
906 let instance_present = {
907 let mut accounts = state_handle.write();
908 let state = accounts.get_or_create(&account_id);
909 if let Some(inst) = state.instances.get_mut(&id) {
910 inst.db_instance_status = "available".to_string();
911 inst.endpoint_address = running.endpoint_address.clone();
912 inst.port = i32::from(running.endpoint_port);
913 inst.host_port = running.host_port;
914 inst.container_id = running.container_id.clone();
915 true
916 } else {
917 false
918 }
919 };
920 if !instance_present {
921 runtime.stop_container(&id).await;
923 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
924 return;
925 }
926 emit_event_static_with_state(
927 delivery_bus.as_ref(),
928 Some(&state_handle),
929 Some(&account_id),
930 RdsSourceType::DbInstance,
931 &id,
932 &arn,
933 event_id,
934 &["creation"],
935 event_message,
936 );
937 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock).await;
938 });
939 }
940
941 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
945 let store = self.snapshot_store.clone()?;
946 let state = self.state.clone();
947 let lock = self.snapshot_lock.clone();
948 Some(Arc::new(move || {
949 let state = state.clone();
950 let store = store.clone();
951 let lock = lock.clone();
952 Box::pin(async move {
953 save_snapshot_static(state, Some(store), lock).await;
954 })
955 }))
956 }
957}
958
959#[async_trait]
960impl AwsService for RdsService {
961 fn service_name(&self) -> &str {
962 "rds"
963 }
964
965 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
966 crate::validation::prevalidate(request.action.as_str(), &request)?;
971
972 let mutates = is_mutating_action(request.action.as_str());
973 let result = match request.action.as_str() {
974 "AddTagsToResource" => self.add_tags_to_resource(&request),
975 "CreateDBInstance" => self.create_db_instance(&request).await,
976 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
977 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
978 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
979 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
980 "DeleteDBInstance" => self.delete_db_instance(&request).await,
981 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
982 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
983 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
984 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
985 "DescribeDBInstances" => self.describe_db_instances(&request),
986 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
987 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
988 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
989 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
990 "DescribeOrderableDBInstanceOptions" => {
991 self.describe_orderable_db_instance_options(&request)
992 }
993 "ListTagsForResource" => self.list_tags_for_resource(&request),
994 "ModifyDBInstance" => self.modify_db_instance(&request),
995 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
996 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
997 "RebootDBInstance" => self.reboot_db_instance(&request).await,
998 "StartDBInstance" => self.start_db_instance(&request).await,
999 "StopDBInstance" => self.stop_db_instance(&request).await,
1000 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
1001 "RestoreDBInstanceFromDBSnapshot" => {
1002 self.restore_db_instance_from_db_snapshot(&request).await
1003 }
1004 "RestoreDBInstanceToPointInTime" => {
1005 self.restore_db_instance_to_point_in_time(&request).await
1006 }
1007 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
1008 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
1009 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
1010 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
1011 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
1012 "RestoreDBClusterToPointInTime" => {
1013 self.restore_db_cluster_to_point_in_time(&request).await
1014 }
1015 _ => self.handle_extra_action(&request),
1016 };
1017 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
1018 self.save_snapshot().await;
1019 }
1020 result
1021 }
1022
1023 fn supported_actions(&self) -> &[&str] {
1024 SUPPORTED_ACTIONS
1025 }
1026}
1027
1028impl RdsService {}
1029
1030pub(crate) fn render_user_parameter_xml(name: &str, value: &str, apply_method: &str) -> String {
1034 format!(
1035 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <ApplyMethod>{}</ApplyMethod>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
1036 xml_escape(name),
1037 xml_escape(value),
1038 xml_escape(apply_method),
1039 )
1040}
1041
1042pub(crate) fn render_engine_default_parameter_xml(
1046 default: &crate::state::EngineDefaultParameter,
1047) -> String {
1048 format!(
1049 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
1050 xml_escape(default.name),
1051 xml_escape(default.value),
1052 xml_escape(default.apply_type),
1053 xml_escape(default.data_type),
1054 xml_escape(default.allowed_values),
1055 default.is_modifiable,
1056 )
1057}
1058
1059pub(crate) struct DbParameterInput {
1069 pub name: String,
1070 pub value: String,
1071 pub apply_method: String,
1072}
1073
1074pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<DbParameterInput> {
1075 let mut out = Vec::new();
1076 for prefix in ["Parameters.Parameter", "Parameters.member"] {
1077 let mut index = 1;
1078 loop {
1079 let name_key = format!("{prefix}.{index}.ParameterName");
1080 let value_key = format!("{prefix}.{index}.ParameterValue");
1081 let apply_key = format!("{prefix}.{index}.ApplyMethod");
1082 let name = optional_query_param(request, &name_key);
1083 let value = optional_query_param(request, &value_key);
1084 if name.is_none() && value.is_none() {
1085 break;
1086 }
1087 if let (Some(n), Some(v)) = (name, value) {
1088 if !n.is_empty() {
1089 let apply_method = optional_query_param(request, &apply_key)
1090 .filter(|m| !m.is_empty())
1091 .unwrap_or_else(|| "immediate".to_string());
1092 out.push(DbParameterInput {
1093 name: n,
1094 value: v,
1095 apply_method,
1096 });
1097 }
1098 }
1099 index += 1;
1100 }
1101 }
1102 out
1103}
1104
1105fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
1109 match (engine, log_file_name) {
1110 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
1111 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
1112 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
1113 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
1114 "/var/log/mysql/slow.log".to_string()
1115 }
1116 _ => log_file_name.to_string(),
1117 }
1118}
1119
1120pub(crate) struct PaginationResult<T> {
1121 items: Vec<T>,
1122 next_marker: Option<String>,
1123}
1124
1125fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
1129 use serde_json::{json, Value};
1130 let Some(map) = state.extras.get_mut("clusters") else {
1131 return;
1132 };
1133 let Some(entry) = map.get_mut(cluster_id) else {
1134 return;
1135 };
1136 let Some(obj) = entry.as_object_mut() else {
1137 return;
1138 };
1139 let mut members: Vec<Value> = obj
1140 .get("DBClusterMembers")
1141 .and_then(|v| v.as_array())
1142 .cloned()
1143 .unwrap_or_default();
1144 if members
1145 .iter()
1146 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
1147 {
1148 return;
1149 }
1150 let has_writer = members
1151 .iter()
1152 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
1153 let promotion_tier = (members.len() as i64) + 1;
1154 members.push(json!({
1155 "DBInstanceIdentifier": instance_id,
1156 "IsClusterWriter": !has_writer,
1157 "DBClusterParameterGroupStatus": "in-sync",
1158 "PromotionTier": promotion_tier,
1159 }));
1160 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
1161 if !has_writer {
1162 obj.insert(
1163 "WriterDBInstanceIdentifier".to_string(),
1164 Value::String(instance_id.to_string()),
1165 );
1166 }
1167}
1168
1169#[path = "../service_helpers.rs"]
1170pub(crate) mod service_helpers;
1171pub(crate) use service_helpers::*;
1172
1173#[cfg(test)]
1174#[path = "../service_tests.rs"]
1175mod tests;