1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeSourceRegions",
125 "DescribeTenantDatabases",
126 "DescribeValidDBInstanceModifications",
127 "DisableHttpEndpoint",
128 "DownloadDBLogFilePortion",
129 "EnableHttpEndpoint",
130 "FailoverDBCluster",
131 "FailoverGlobalCluster",
132 "ListTagsForResource",
133 "ModifyActivityStream",
134 "ModifyCertificates",
135 "ModifyCurrentDBClusterCapacity",
136 "ModifyCustomDBEngineVersion",
137 "ModifyDBCluster",
138 "ModifyDBClusterEndpoint",
139 "ModifyDBClusterParameterGroup",
140 "ModifyDBClusterSnapshotAttribute",
141 "ModifyDBInstance",
142 "ModifyDBParameterGroup",
143 "ModifyDBProxy",
144 "ModifyDBProxyEndpoint",
145 "ModifyDBProxyTargetGroup",
146 "ModifyDBRecommendation",
147 "ModifyDBShardGroup",
148 "ModifyDBSnapshot",
149 "ModifyDBSnapshotAttribute",
150 "ModifyDBSubnetGroup",
151 "ModifyEventSubscription",
152 "ModifyGlobalCluster",
153 "ModifyIntegration",
154 "ModifyOptionGroup",
155 "ModifyTenantDatabase",
156 "PromoteReadReplica",
157 "PromoteReadReplicaDBCluster",
158 "PurchaseReservedDBInstancesOffering",
159 "RebootDBCluster",
160 "RebootDBInstance",
161 "RebootDBShardGroup",
162 "RegisterDBProxyTargets",
163 "RemoveFromGlobalCluster",
164 "RemoveRoleFromDBCluster",
165 "RemoveRoleFromDBInstance",
166 "RemoveSourceIdentifierFromSubscription",
167 "RemoveTagsFromResource",
168 "ResetDBClusterParameterGroup",
169 "ResetDBParameterGroup",
170 "RestoreDBClusterFromS3",
171 "RestoreDBClusterFromSnapshot",
172 "RestoreDBClusterToPointInTime",
173 "RestoreDBInstanceFromDBSnapshot",
174 "RestoreDBInstanceFromS3",
175 "RestoreDBInstanceToPointInTime",
176 "RevokeDBSecurityGroupIngress",
177 "StartActivityStream",
178 "StartDBCluster",
179 "StartDBInstance",
180 "StartDBInstanceAutomatedBackupsReplication",
181 "StartExportTask",
182 "StopActivityStream",
183 "StopDBCluster",
184 "StopDBInstance",
185 "StopDBInstanceAutomatedBackupsReplication",
186 "SwitchoverBlueGreenDeployment",
187 "SwitchoverGlobalCluster",
188 "SwitchoverReadReplica",
189];
190
191pub struct RdsService {
192 pub(crate) state: SharedRdsState,
193 runtime: Option<Arc<RdsRuntime>>,
194 snapshot_store: Option<Arc<dyn SnapshotStore>>,
195 snapshot_lock: Arc<AsyncMutex<()>>,
196 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
197}
198
199#[derive(Clone, Copy)]
201#[allow(dead_code, clippy::enum_variant_names)]
202pub(crate) enum RdsSourceType {
203 DbInstance,
204 DbSnapshot,
205 DbParameterGroup,
206 DbCluster,
207 DbClusterSnapshot,
208}
209
210impl RdsSourceType {
211 fn as_str(self) -> &'static str {
214 match self {
215 Self::DbInstance => "DB_INSTANCE",
216 Self::DbSnapshot => "DB_SNAPSHOT",
217 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
218 Self::DbCluster => "DB_CLUSTER",
219 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
220 }
221 }
222
223 pub(crate) fn describe_events_str(self) -> &'static str {
228 match self {
229 Self::DbInstance => "db-instance",
230 Self::DbSnapshot => "db-snapshot",
231 Self::DbParameterGroup => "db-parameter-group",
232 Self::DbCluster => "db-cluster",
233 Self::DbClusterSnapshot => "db-cluster-snapshot",
234 }
235 }
236
237 fn detail_type(self) -> &'static str {
238 match self {
239 Self::DbInstance => "RDS DB Instance Event",
240 Self::DbSnapshot => "RDS DB Snapshot Event",
241 Self::DbParameterGroup => "RDS DB Parameter Group Event",
242 Self::DbCluster => "RDS DB Cluster Event",
243 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
244 }
245 }
246}
247
248impl RdsService {
249 pub(crate) fn state_handle(&self) -> &SharedRdsState {
250 &self.state
251 }
252}
253
254impl RdsService {
255 pub fn new(state: SharedRdsState) -> Self {
256 Self {
257 state,
258 runtime: None,
259 snapshot_store: None,
260 snapshot_lock: Arc::new(AsyncMutex::new(())),
261 delivery_bus: None,
262 }
263 }
264
265 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
266 self.runtime = Some(runtime);
267 self
268 }
269
270 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
274 self.runtime.as_ref()
275 }
276
277 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
278 self.snapshot_store = Some(store);
279 self
280 }
281
282 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
283 self.delivery_bus = Some(bus);
284 self
285 }
286
287 pub(crate) fn emit_event(
292 &self,
293 source_type: RdsSourceType,
294 source_identifier: &str,
295 source_arn: &str,
296 event_id: &str,
297 event_categories: &[&str],
298 message: &str,
299 ) {
300 let account_id = source_arn.split(':').nth(4).unwrap_or("");
303 emit_event_static_with_state(
304 self.delivery_bus.as_ref(),
305 Some(&self.state),
306 if account_id.is_empty() {
307 None
308 } else {
309 Some(account_id)
310 },
311 source_type,
312 source_identifier,
313 source_arn,
314 event_id,
315 event_categories,
316 message,
317 );
318 }
319
320 async fn save_snapshot(&self) {
321 save_snapshot_static(
322 self.state.clone(),
323 self.snapshot_store.clone(),
324 self.snapshot_lock.clone(),
325 )
326 .await;
327 }
328
329 pub async fn recover_persisted_containers(&self) {
338 let Some(runtime) = self.runtime.clone() else {
339 return;
340 };
341
342 struct Pending {
343 account_id: String,
344 region: String,
345 id: String,
346 arn: String,
347 engine: String,
348 engine_version: String,
349 username: String,
350 password: String,
351 db_name: String,
352 }
353
354 let pending: Vec<Pending> = {
355 let mut accounts = self.state.write();
356 let mut out = Vec::new();
357 for (_, state) in accounts.iter_mut() {
358 let account_id = state.account_id.clone();
359 let region = state.region.clone();
360 for (id, inst) in state.instances.iter_mut() {
361 if !matches!(
362 inst.db_instance_status.as_str(),
363 "available" | "starting" | "modifying" | "rebooting" | "backing-up"
364 ) {
365 continue;
366 }
367 inst.db_instance_status = "starting".to_string();
368 out.push(Pending {
369 account_id: account_id.clone(),
370 region: region.clone(),
371 id: id.clone(),
372 arn: inst.db_instance_arn.clone(),
373 engine: inst.engine.clone(),
374 engine_version: inst.engine_version.clone(),
375 username: inst.master_username.clone(),
376 password: inst.master_user_password.clone(),
377 db_name: inst
378 .db_name
379 .clone()
380 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
381 });
382 }
383 }
384 out
385 };
386
387 if pending.is_empty() {
388 return;
389 }
390 tracing::info!(
391 count = pending.len(),
392 "recovering backing containers for persisted rds instances",
393 );
394
395 for p in pending {
396 let runtime = runtime.clone();
397 let state = self.state.clone();
398 let snapshot_store = self.snapshot_store.clone();
399 let snapshot_lock = self.snapshot_lock.clone();
400 let delivery_bus = self.delivery_bus.clone();
401 tokio::spawn(async move {
402 match runtime
403 .ensure_postgres(
404 &p.id,
405 &p.engine,
406 &p.engine_version,
407 &p.username,
408 &p.password,
409 &p.db_name,
410 &p.account_id,
411 &p.region,
412 )
413 .await
414 {
415 Ok(running) => {
416 {
417 let mut accounts = state.write();
418 if let Some(s) = accounts.get_mut(&p.account_id) {
419 if let Some(inst) = s.instances.get_mut(&p.id) {
420 inst.db_instance_status = "available".to_string();
421 inst.endpoint_address = "127.0.0.1".to_string();
422 inst.port = i32::from(running.host_port);
423 inst.host_port = running.host_port;
424 inst.container_id = running.container_id;
425 }
426 }
427 }
428 save_snapshot_static(
429 state.clone(),
430 snapshot_store.clone(),
431 snapshot_lock.clone(),
432 )
433 .await;
434 emit_event_static(
435 delivery_bus.as_ref(),
436 RdsSourceType::DbInstance,
437 &p.id,
438 &p.arn,
439 "RDS-EVENT-0088",
440 &["notification"],
441 "DB instance restarted after fakecloud restart",
442 );
443 }
444 Err(error) => {
445 tracing::error!(
446 %error,
447 db_instance_identifier = %p.id,
448 "failed to recover rds backing container after restart",
449 );
450 {
451 let mut accounts = state.write();
452 if let Some(s) = accounts.get_mut(&p.account_id) {
453 if let Some(inst) = s.instances.get_mut(&p.id) {
454 inst.db_instance_status = "failed".to_string();
455 }
456 }
457 }
458 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
459 }
460 }
461 });
462 }
463 }
464
465 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
471 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
472
473 let arn = {
478 let accounts = self.state.read();
479 let empty = RdsState::new(&request.account_id, &request.region);
480 let state = accounts.get(&request.account_id).unwrap_or(&empty);
481 state
482 .instances
483 .get(&db_instance_identifier)
484 .map(|i| i.db_instance_arn.clone())
485 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
486 };
487
488 if let Some(runtime) = self.runtime.as_ref() {
489 runtime.stop_container(&db_instance_identifier).await;
490 }
491
492 let instance = {
493 let mut accounts = self.state.write();
494 let state = accounts.get_or_create(&request.account_id);
495 let inst = state
496 .instances
497 .get_mut(&db_instance_identifier)
498 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
499 inst.db_instance_status = "stopped".to_string();
500 inst.container_id = String::new();
501 inst.clone()
502 };
503
504 self.emit_event(
505 RdsSourceType::DbInstance,
506 &db_instance_identifier,
507 &arn,
508 "RDS-EVENT-0089",
509 &["notification"],
510 "DB instance stopped",
511 );
512
513 Ok(AwsResponse::xml(
514 StatusCode::OK,
515 query_response_xml(
516 "StopDBInstance",
517 RDS_NS,
518 &format!(
519 "<DBInstance>{}</DBInstance>",
520 db_instance_xml(&instance, Some("stopped"))
521 ),
522 &request.request_id,
523 ),
524 ))
525 }
526
527 async fn start_db_instance(
530 &self,
531 request: &AwsRequest,
532 ) -> Result<AwsResponse, AwsServiceError> {
533 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
534
535 let instance = {
539 let accounts = self.state.read();
540 let empty = RdsState::new(&request.account_id, &request.region);
541 let state = accounts.get(&request.account_id).unwrap_or(&empty);
542 state
543 .instances
544 .get(&db_instance_identifier)
545 .cloned()
546 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
547 };
548
549 {
552 let mut accounts = self.state.write();
553 let state = accounts.get_or_create(&request.account_id);
554 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
555 inst.db_instance_status = "starting".to_string();
556 }
557 }
558
559 let running = if let Some(runtime) = self.runtime.as_ref() {
560 Some(
561 runtime
562 .ensure_postgres(
563 &db_instance_identifier,
564 &instance.engine,
565 &instance.engine_version,
566 &instance.master_username,
567 &instance.master_user_password,
568 instance
569 .db_name
570 .as_deref()
571 .unwrap_or(default_db_name(&instance.engine)),
572 &request.account_id,
573 &request.region,
574 )
575 .await
576 .map_err(runtime_error_to_service_error)?,
577 )
578 } else {
579 None
580 };
581
582 let instance = {
583 let mut accounts = self.state.write();
584 let state = accounts.get_or_create(&request.account_id);
585 let inst = state
586 .instances
587 .get_mut(&db_instance_identifier)
588 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
589 inst.db_instance_status = "available".to_string();
590 inst.endpoint_address = "127.0.0.1".to_string();
591 if let Some(r) = running {
592 inst.port = i32::from(r.host_port);
593 inst.host_port = r.host_port;
594 inst.container_id = r.container_id;
595 }
596 inst.clone()
597 };
598
599 self.emit_event(
600 RdsSourceType::DbInstance,
601 &db_instance_identifier,
602 &instance.db_instance_arn,
603 "RDS-EVENT-0088",
604 &["notification"],
605 "DB instance started",
606 );
607
608 Ok(AwsResponse::xml(
609 StatusCode::OK,
610 query_response_xml(
611 "StartDBInstance",
612 RDS_NS,
613 &format!(
614 "<DBInstance>{}</DBInstance>",
615 db_instance_xml(&instance, None)
616 ),
617 &request.request_id,
618 ),
619 ))
620 }
621}
622
623fn is_declared_add_tags_not_found(code: &str) -> bool {
631 matches!(
632 code,
633 "BlueGreenDeploymentNotFoundFault"
634 | "DBClusterNotFoundFault"
635 | "DBInstanceNotFound"
636 | "DBProxyEndpointNotFoundFault"
637 | "DBProxyNotFoundFault"
638 | "DBProxyTargetGroupNotFoundFault"
639 | "DBShardGroupNotFound"
640 | "DBSnapshotNotFound"
641 | "DBSnapshotTenantDatabaseNotFoundFault"
642 | "IntegrationNotFoundFault"
643 | "InvalidDBClusterEndpointStateFault"
644 | "InvalidDBClusterStateFault"
645 | "InvalidDBInstanceState"
646 | "TenantDatabaseNotFound"
647 )
648}
649
650async fn save_snapshot_static(
652 state: SharedRdsState,
653 store: Option<Arc<dyn SnapshotStore>>,
654 lock: Arc<AsyncMutex<()>>,
655) {
656 let Some(store) = store else {
657 return;
658 };
659 let _guard = lock.lock().await;
660 let snapshot = RdsSnapshot {
661 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
662 state: None,
663 accounts: Some(state.read().clone()),
664 };
665 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
666 let bytes = serde_json::to_vec(&snapshot)
667 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
668 store.save(&bytes)
669 })
670 .await;
671 match join {
672 Ok(Ok(())) => {}
673 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
674 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
675 }
676}
677
678impl RdsService {
679 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
690 self.runtime.as_ref().ok_or_else(|| {
691 AwsServiceError::aws_error(
692 StatusCode::SERVICE_UNAVAILABLE,
693 "InsufficientDBInstanceCapacity",
694 "Docker/Podman is required for RDS DB instances but is not available",
695 )
696 })
697 }
698}
699
700#[async_trait]
701impl AwsService for RdsService {
702 fn service_name(&self) -> &str {
703 "rds"
704 }
705
706 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
707 crate::validation::prevalidate(request.action.as_str(), &request)?;
712
713 let mutates = is_mutating_action(request.action.as_str());
714 let result = match request.action.as_str() {
715 "AddTagsToResource" => self.add_tags_to_resource(&request),
716 "CreateDBInstance" => self.create_db_instance(&request).await,
717 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
718 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
719 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
720 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
721 "DeleteDBInstance" => self.delete_db_instance(&request).await,
722 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
723 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
724 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
725 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
726 "DescribeDBInstances" => self.describe_db_instances(&request),
727 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
728 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
729 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
730 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
731 "DescribeOrderableDBInstanceOptions" => {
732 self.describe_orderable_db_instance_options(&request)
733 }
734 "ListTagsForResource" => self.list_tags_for_resource(&request),
735 "ModifyDBInstance" => self.modify_db_instance(&request),
736 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
737 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
738 "RebootDBInstance" => self.reboot_db_instance(&request).await,
739 "StartDBInstance" => self.start_db_instance(&request).await,
740 "StopDBInstance" => self.stop_db_instance(&request).await,
741 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
742 "RestoreDBInstanceFromDBSnapshot" => {
743 self.restore_db_instance_from_db_snapshot(&request).await
744 }
745 "RestoreDBInstanceToPointInTime" => {
746 self.restore_db_instance_to_point_in_time(&request).await
747 }
748 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
749 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
750 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
751 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
752 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
753 "RestoreDBClusterToPointInTime" => {
754 self.restore_db_cluster_to_point_in_time(&request).await
755 }
756 _ => self.handle_extra_action(&request),
757 };
758 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
759 self.save_snapshot().await;
760 }
761 result
762 }
763
764 fn supported_actions(&self) -> &[&str] {
765 SUPPORTED_ACTIONS
766 }
767}
768
769impl RdsService {
770 async fn create_db_instance(
771 &self,
772 request: &AwsRequest,
773 ) -> Result<AwsResponse, AwsServiceError> {
774 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
775 let db_instance_class = required_query_param(request, "DBInstanceClass")?;
776 let engine = required_query_param(request, "Engine")?;
777 let allocated_storage = optional_i32_param(request, "AllocatedStorage")?
785 .filter(|v| *v > 0)
786 .unwrap_or(20);
787 let master_username =
788 optional_query_param(request, "MasterUsername").unwrap_or_else(|| "admin".to_string());
789 let master_user_password = optional_query_param(request, "MasterUserPassword")
790 .unwrap_or_else(|| "Password1!".to_string());
791 let db_name = optional_query_param(request, "DBName");
792 let engine_version =
793 optional_query_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
794 let publicly_accessible =
795 parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?
796 .unwrap_or(true);
797 let deletion_protection =
798 parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?
799 .unwrap_or(false);
800 let port = optional_i32_param(request, "Port")?
801 .unwrap_or_else(|| default_port_for_engine(&engine));
802 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
803
804 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName")
805 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
806
807 let backup_retention_period =
808 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
809 let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow")
810 .unwrap_or_else(|| "03:00-04:00".to_string());
811 let option_group_name = optional_query_param(request, "OptionGroupName");
812 let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?
813 .unwrap_or(false);
814 let availability_zone = optional_query_param(request, "AvailabilityZone");
815 let storage_type = optional_query_param(request, "StorageType");
816 let storage_encrypted =
817 parse_optional_bool(optional_query_param(request, "StorageEncrypted").as_deref())?
818 .unwrap_or(false);
819 let kms_key_id = optional_query_param(request, "KmsKeyId");
820 let iam_database_authentication_enabled = parse_optional_bool(
821 optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
822 )?
823 .unwrap_or(false);
824 let iops = optional_i32_param(request, "Iops")?;
825 let monitoring_interval = optional_i32_param(request, "MonitoringInterval")?;
826 let monitoring_role_arn = optional_query_param(request, "MonitoringRoleArn");
827 let performance_insights_enabled = parse_optional_bool(
828 optional_query_param(request, "EnablePerformanceInsights").as_deref(),
829 )?
830 .unwrap_or(false);
831 let performance_insights_kms_key_id =
832 optional_query_param(request, "PerformanceInsightsKMSKeyId");
833 let performance_insights_retention_period =
834 optional_i32_param(request, "PerformanceInsightsRetentionPeriod")?;
835 let enabled_cloudwatch_logs_exports =
836 parse_cloudwatch_logs_exports(request, "EnableCloudwatchLogsExports");
837 let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
838 let network_type = optional_query_param(request, "NetworkType");
839 let character_set_name = optional_query_param(request, "CharacterSetName");
840 let auto_minor_version_upgrade = parse_optional_bool(
841 optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
842 )?;
843 let copy_tags_to_snapshot =
844 parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
845 let db_cluster_identifier = optional_query_param(request, "DBClusterIdentifier");
846
847 validate_create_request(
848 &db_instance_identifier,
849 allocated_storage,
850 &db_instance_class,
851 &engine,
852 &engine_version,
853 port,
854 )?;
855
856 {
857 let mut accounts = self.state.write();
858 let state = accounts.get_or_create(&request.account_id);
859 if !state.begin_instance_creation(&db_instance_identifier) {
860 return Err(AwsServiceError::aws_error(
861 StatusCode::BAD_REQUEST,
862 "DBInstanceAlreadyExists",
863 format!("DBInstance {} already exists.", db_instance_identifier),
864 ));
865 }
866 if let Some(ref pg_name) = db_parameter_group_name {
868 if !state.parameter_groups.contains_key(pg_name) {
869 state.cancel_instance_creation(&db_instance_identifier);
870 return Err(AwsServiceError::aws_error(
871 StatusCode::NOT_FOUND,
872 "DBParameterGroupNotFound",
873 format!("DBParameterGroup {} not found.", pg_name),
874 ));
875 }
876 }
877 }
878
879 let runtime = self.require_runtime()?.clone();
880
881 let logical_db_name = db_name
882 .clone()
883 .unwrap_or_else(|| default_db_name(&engine).to_string());
884
885 let created_at = Utc::now();
891 let instance = {
892 let mut accounts = self.state.write();
893 let state = accounts.get_or_create(&request.account_id);
894 let placeholder = DbInstance {
895 db_instance_identifier: db_instance_identifier.clone(),
896 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
897 db_instance_class: db_instance_class.clone(),
898 engine: engine.clone(),
899 engine_version: engine_version.clone(),
900 db_instance_status: "creating".to_string(),
901 master_username: master_username.clone(),
902 db_name: db_name.clone(),
903 endpoint_address: String::new(),
904 port: 0,
905 allocated_storage,
906 publicly_accessible,
907 deletion_protection,
908 created_at,
909 dbi_resource_id: state.next_dbi_resource_id(),
910 master_user_password: master_user_password.clone(),
911 container_id: String::new(),
912 host_port: 0,
913 tags: Vec::new(),
914 read_replica_source_db_instance_identifier: None,
915 read_replica_db_instance_identifiers: Vec::new(),
916 vpc_security_group_ids,
917 db_parameter_group_name,
918 backup_retention_period,
919 preferred_backup_window,
920 preferred_maintenance_window: None,
921 latest_restorable_time: if backup_retention_period > 0 {
922 Some(created_at)
923 } else {
924 None
925 },
926 option_group_name,
927 multi_az,
928 pending_modified_values: None,
929 availability_zone,
930 storage_type,
931 storage_encrypted,
932 kms_key_id,
933 iam_database_authentication_enabled,
934 iops,
935 monitoring_interval,
936 monitoring_role_arn,
937 performance_insights_enabled,
938 performance_insights_kms_key_id,
939 performance_insights_retention_period,
940 enabled_cloudwatch_logs_exports,
941 ca_certificate_identifier,
942 network_type,
943 character_set_name,
944 auto_minor_version_upgrade,
945 copy_tags_to_snapshot,
946 master_user_secret_arn: None,
947 master_user_secret_kms_key_id: None,
948 license_model: None,
949 max_allocated_storage: None,
950 multi_tenant: None,
951 storage_throughput: None,
952 tde_credential_arn: None,
953 delete_automated_backups: None,
954 db_security_groups: Vec::new(),
955 domain: None,
956 domain_fqdn: None,
957 domain_ou: None,
958 domain_iam_role_name: None,
959 domain_auth_secret_arn: None,
960 domain_dns_ips: Vec::new(),
961 db_cluster_identifier: db_cluster_identifier.clone(),
962 };
963 state.finish_instance_creation(placeholder.clone());
964 placeholder
965 };
966 let instance_arn = instance.db_instance_arn.clone();
967
968 self.emit_event(
969 RdsSourceType::DbInstance,
970 &db_instance_identifier,
971 &instance_arn,
972 "RDS-EVENT-0005",
973 &["creation"],
974 "DB instance created",
975 );
976
977 {
978 let state_handle = self.state.clone();
979 let delivery_bus = self.delivery_bus.clone();
980 let runtime = runtime.clone();
981 let id = db_instance_identifier.clone();
982 let engine = engine.clone();
983 let engine_version = engine_version.clone();
984 let master_username = master_username.clone();
985 let master_user_password = master_user_password.clone();
986 let logical_db_name_task = logical_db_name.clone();
987 let account_id = request.account_id.clone();
988 let region = request.region.clone();
989 let arn = instance_arn.clone();
990 let snapshot_store = self.snapshot_store.clone();
991 let snapshot_lock = self.snapshot_lock.clone();
992 let cluster_id_for_attach = db_cluster_identifier.clone();
993 tokio::spawn(async move {
994 match runtime
995 .ensure_postgres(
996 &id,
997 &engine,
998 &engine_version,
999 &master_username,
1000 &master_user_password,
1001 &logical_db_name_task,
1002 &account_id,
1003 ®ion,
1004 )
1005 .await
1006 {
1007 Ok(running) => {
1008 let pending_dump = if let Some(ref cid) = cluster_id_for_attach {
1015 let mut accounts = state_handle.write();
1016 let state = accounts.get_or_create(&account_id);
1017 state
1018 .extras
1019 .get_mut("clusters")
1020 .and_then(|m| m.get_mut(cid))
1021 .and_then(|entry| entry.as_object_mut())
1022 .and_then(|obj| obj.remove("PendingRestoreDumpB64"))
1023 .and_then(|v| v.as_str().map(str::to_string))
1024 .and_then(|b64| {
1025 use base64::Engine;
1026 base64::engine::general_purpose::STANDARD
1027 .decode(b64.as_bytes())
1028 .ok()
1029 })
1030 } else {
1031 None
1032 };
1033 if let Some(dump) = pending_dump {
1034 if let Err(error) = runtime
1035 .restore_database(
1036 &id,
1037 &engine,
1038 &master_username,
1039 &master_user_password,
1040 &logical_db_name_task,
1041 &dump,
1042 )
1043 .await
1044 {
1045 tracing::error!(%error, db_instance_identifier=%id, "cluster restore dump replay failed");
1046 }
1047 }
1048
1049 {
1050 let mut accounts = state_handle.write();
1051 let state = accounts.get_or_create(&account_id);
1052 if let Some(inst) = state.instances.get_mut(&id) {
1053 inst.db_instance_status = "available".to_string();
1054 inst.endpoint_address = "127.0.0.1".to_string();
1055 inst.port = i32::from(running.host_port);
1056 inst.host_port = running.host_port;
1057 inst.container_id = running.container_id;
1058 }
1059 if let Some(ref cid) = cluster_id_for_attach {
1062 attach_cluster_member(state, cid, &id);
1063 }
1064 }
1065 save_snapshot_static(
1070 state_handle.clone(),
1071 snapshot_store.clone(),
1072 snapshot_lock.clone(),
1073 )
1074 .await;
1075 }
1076 Err(error) => {
1077 tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
1078 {
1079 let mut accounts = state_handle.write();
1080 let state = accounts.get_or_create(&account_id);
1081 state.instances.remove(&id);
1082 }
1083 save_snapshot_static(
1084 state_handle.clone(),
1085 snapshot_store.clone(),
1086 snapshot_lock.clone(),
1087 )
1088 .await;
1089 emit_event_static(
1090 delivery_bus.as_ref(),
1091 RdsSourceType::DbInstance,
1092 &id,
1093 &arn,
1094 "RDS-EVENT-0058",
1095 &["failure"],
1096 &format!("DB instance failed to create: {}", error),
1097 );
1098 }
1099 }
1100 });
1101 }
1102
1103 Ok(AwsResponse::xml(
1104 StatusCode::OK,
1105 query_response_xml(
1106 "CreateDBInstance",
1107 RDS_NS,
1108 &format!(
1109 "<DBInstance>{}</DBInstance>",
1110 db_instance_xml(&instance, None)
1111 ),
1112 &request.request_id,
1113 ),
1114 ))
1115 }
1116
1117 async fn delete_db_instance(
1118 &self,
1119 request: &AwsRequest,
1120 ) -> Result<AwsResponse, AwsServiceError> {
1121 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1122 let skip_final_snapshot =
1123 parse_optional_bool(optional_query_param(request, "SkipFinalSnapshot").as_deref())?
1124 .unwrap_or(false);
1125 let final_db_snapshot_identifier =
1126 optional_query_param(request, "FinalDBSnapshotIdentifier");
1127
1128 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
1129 return Err(AwsServiceError::aws_error(
1130 StatusCode::BAD_REQUEST,
1131 "InvalidDBInstanceState",
1132 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
1133 ));
1134 }
1135 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
1136 return Err(AwsServiceError::aws_error(
1137 StatusCode::BAD_REQUEST,
1138 "InvalidDBInstanceState",
1139 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
1140 ));
1141 }
1142
1143 {
1145 let accounts = self.state.read();
1146 let empty = RdsState::new(&request.account_id, &request.region);
1147 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1148 if let Some(instance) = state.instances.get(&db_instance_identifier) {
1149 if instance.deletion_protection {
1150 return Err(AwsServiceError::aws_error(
1151 StatusCode::BAD_REQUEST,
1152 "InvalidDBInstanceState",
1153 format!(
1154 "DBInstance {} cannot be deleted because deletion protection is enabled.",
1155 db_instance_identifier
1156 ),
1157 ));
1158 }
1159 } else {
1160 return Err(db_instance_not_found(&db_instance_identifier));
1161 }
1162 }
1163
1164 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
1165 self.create_final_db_snapshot(
1166 &db_instance_identifier,
1167 snapshot_id,
1168 &request.account_id,
1169 &request.region,
1170 )
1171 .await?;
1172 }
1173
1174 let instance = {
1175 let mut accounts = self.state.write();
1176 let state = accounts.get_or_create(&request.account_id);
1177 let instance = state
1178 .instances
1179 .remove(&db_instance_identifier)
1180 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1181
1182 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
1183 if let Some(source) = state.instances.get_mut(source_id) {
1184 source
1185 .read_replica_db_instance_identifiers
1186 .retain(|id| id != &db_instance_identifier);
1187 }
1188 }
1189
1190 for replica_id in &instance.read_replica_db_instance_identifiers {
1191 if let Some(replica) = state.instances.get_mut(replica_id) {
1192 replica.read_replica_source_db_instance_identifier = None;
1193 }
1194 }
1195
1196 instance
1197 };
1198
1199 if let Some(runtime) = &self.runtime {
1200 runtime.stop_container(&db_instance_identifier).await;
1201 }
1202
1203 self.emit_event(
1204 RdsSourceType::DbInstance,
1205 &db_instance_identifier,
1206 &instance.db_instance_arn,
1207 "RDS-EVENT-0003",
1208 &["deletion"],
1209 "DB instance deleted",
1210 );
1211
1212 Ok(AwsResponse::xml(
1213 StatusCode::OK,
1214 query_response_xml(
1215 "DeleteDBInstance",
1216 RDS_NS,
1217 &format!(
1218 "<DBInstance>{}</DBInstance>",
1219 db_instance_xml(&instance, Some("deleting"))
1220 ),
1221 &request.request_id,
1222 ),
1223 ))
1224 }
1225
1226 async fn create_final_db_snapshot(
1232 &self,
1233 db_instance_identifier: &str,
1234 snapshot_id: &str,
1235 account_id: &str,
1236 region: &str,
1237 ) -> Result<(), AwsServiceError> {
1238 let runtime = self.runtime.as_ref().ok_or_else(|| {
1239 AwsServiceError::aws_error(
1240 StatusCode::SERVICE_UNAVAILABLE,
1241 "InvalidDBSnapshotState",
1242 "Docker/Podman is required for RDS snapshots but is not available",
1243 )
1244 })?;
1245
1246 let (instance_for_snapshot, db_name) = {
1247 let accounts = self.state.read();
1248 let empty = RdsState::new(account_id, region);
1249 let state = accounts.get(account_id).unwrap_or(&empty);
1250
1251 if state.snapshots.contains_key(snapshot_id) {
1252 return Err(AwsServiceError::aws_error(
1253 StatusCode::CONFLICT,
1254 "DBSnapshotAlreadyExists",
1255 format!("DBSnapshot {snapshot_id} already exists."),
1256 ));
1257 }
1258
1259 let instance = state
1260 .instances
1261 .get(db_instance_identifier)
1262 .cloned()
1263 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
1264
1265 let default_db = default_db_name(&instance.engine);
1266 let db_name = instance
1267 .db_name
1268 .as_deref()
1269 .unwrap_or(default_db)
1270 .to_string();
1271
1272 (instance, db_name)
1273 };
1274
1275 let dump_data = runtime
1276 .dump_database(
1277 db_instance_identifier,
1278 &instance_for_snapshot.engine,
1279 &instance_for_snapshot.master_username,
1280 &instance_for_snapshot.master_user_password,
1281 &db_name,
1282 )
1283 .await
1284 .map_err(runtime_error_to_service_error)?;
1285
1286 let mut accounts = self.state.write();
1287 let state = accounts.get_or_create(account_id);
1288
1289 if state.snapshots.contains_key(snapshot_id) {
1290 return Err(AwsServiceError::aws_error(
1291 StatusCode::CONFLICT,
1292 "DBSnapshotAlreadyExists",
1293 format!("DBSnapshot {snapshot_id} already exists."),
1294 ));
1295 }
1296
1297 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
1298
1299 let snapshot = DbSnapshot {
1300 db_snapshot_identifier: snapshot_id.to_string(),
1301 db_snapshot_arn: snapshot_arn,
1302 db_instance_identifier: db_instance_identifier.to_string(),
1303 snapshot_create_time: Utc::now(),
1304 engine: instance_for_snapshot.engine.clone(),
1305 engine_version: instance_for_snapshot.engine_version.clone(),
1306 allocated_storage: instance_for_snapshot.allocated_storage,
1307 status: "available".to_string(),
1308 port: instance_for_snapshot.port,
1309 master_username: instance_for_snapshot.master_username.clone(),
1310 db_name: instance_for_snapshot.db_name.clone(),
1311 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
1312 snapshot_type: "automated".to_string(),
1313 master_user_password: instance_for_snapshot.master_user_password.clone(),
1314 tags: Vec::new(),
1315 dump_data,
1316 availability_zone: instance_for_snapshot.availability_zone.clone(),
1317 vpc_id: None,
1318 instance_create_time: Some(instance_for_snapshot.created_at),
1319 license_model: Some(
1320 service_helpers::license_model_for_engine(&instance_for_snapshot.engine)
1321 .to_string(),
1322 ),
1323 iops: instance_for_snapshot.iops,
1324 option_group_name: instance_for_snapshot.option_group_name.clone(),
1325 percent_progress: Some(100),
1326 storage_type: instance_for_snapshot.storage_type.clone(),
1327 encrypted: instance_for_snapshot.storage_encrypted,
1328 kms_key_id: instance_for_snapshot.kms_key_id.clone(),
1329 iam_database_authentication_enabled: instance_for_snapshot
1330 .iam_database_authentication_enabled,
1331 timezone: None,
1332 storage_throughput: None,
1333 };
1334
1335 state.snapshots.insert(snapshot_id.to_string(), snapshot);
1336 Ok(())
1337 }
1338
1339 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1340 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1341 let apply_immediately =
1342 parse_optional_bool(optional_query_param(request, "ApplyImmediately").as_deref())?;
1343
1344 let deletion_protection =
1347 parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?;
1348 let backup_retention_period =
1349 parse_optional_i32(optional_query_param(request, "BackupRetentionPeriod").as_deref())?;
1350 let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow");
1351 let preferred_maintenance_window =
1352 optional_query_param(request, "PreferredMaintenanceWindow");
1353 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
1354 let master_user_secret_kms_key_id =
1355 optional_query_param(request, "MasterUserSecretKmsKeyId");
1356 let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
1357 let monitoring_interval =
1358 parse_optional_i32(optional_query_param(request, "MonitoringInterval").as_deref())?;
1359 let option_group_name = optional_query_param(request, "OptionGroupName");
1360 let auto_minor_version_upgrade = parse_optional_bool(
1361 optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
1362 )?;
1363 let copy_tags_to_snapshot =
1364 parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
1365 let delete_automated_backups = parse_optional_bool(
1366 optional_query_param(request, "DeleteAutomatedBackups").as_deref(),
1367 )?;
1368 let enable_iam_db_auth = parse_optional_bool(
1369 optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
1370 )?;
1371 let max_allocated_storage =
1372 parse_optional_i32(optional_query_param(request, "MaxAllocatedStorage").as_deref())?;
1373 let network_type = optional_query_param(request, "NetworkType");
1374 let domain = optional_query_param(request, "Domain");
1375 let domain_fqdn = optional_query_param(request, "DomainFqdn");
1376 let domain_ou = optional_query_param(request, "DomainOu");
1377 let domain_iam_role_name = optional_query_param(request, "DomainIAMRoleName");
1378 let domain_auth_secret_arn = optional_query_param(request, "DomainAuthSecretArn");
1379 let domain_dns_ips = {
1380 let v = parse_string_member_list(request, "DomainDnsIps");
1381 if v.is_empty() {
1382 None
1383 } else {
1384 Some(v)
1385 }
1386 };
1387 let disable_domain =
1388 parse_optional_bool(optional_query_param(request, "DisableDomain").as_deref())?;
1389 let rotate_master_user_password = parse_optional_bool(
1390 optional_query_param(request, "RotateMasterUserPassword").as_deref(),
1391 )?;
1392
1393 let db_instance_class = optional_query_param(request, "DBInstanceClass");
1394 let master_user_password = optional_query_param(request, "MasterUserPassword");
1395 let engine_version = optional_query_param(request, "EngineVersion");
1396 let allocated_storage =
1397 parse_optional_i32(optional_query_param(request, "AllocatedStorage").as_deref())?;
1398 let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?;
1399 let iops = parse_optional_i32(optional_query_param(request, "Iops").as_deref())?;
1400 let storage_type = optional_query_param(request, "StorageType");
1401 let storage_throughput =
1402 parse_optional_i32(optional_query_param(request, "StorageThroughput").as_deref())?;
1403 let performance_insights_enabled = parse_optional_bool(
1404 optional_query_param(request, "EnablePerformanceInsights").as_deref(),
1405 )?;
1406 let license_model = optional_query_param(request, "LicenseModel");
1407 let multi_tenant =
1408 parse_optional_bool(optional_query_param(request, "MultiTenant").as_deref())?;
1409 let publicly_accessible =
1410 parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?;
1411 let tde_credential_arn = optional_query_param(request, "TdeCredentialArn");
1412 let db_port_number =
1413 parse_optional_i32(optional_query_param(request, "DBPortNumber").as_deref())?;
1414
1415 let cloudwatch_enable = collect_cloudwatch_log_types(request, "EnableLogTypes");
1420 let cloudwatch_disable = collect_cloudwatch_log_types(request, "DisableLogTypes");
1421 let cloudwatch_changed = !cloudwatch_enable.is_empty() || !cloudwatch_disable.is_empty();
1422
1423 let vpc_security_group_ids = {
1425 let mut ids = Vec::new();
1426 for index in 1.. {
1427 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1428 match optional_query_param(request, &sg_id_name) {
1429 Some(sg_id) => ids.push(sg_id),
1430 None => break,
1431 }
1432 }
1433 if ids.is_empty() {
1434 None
1435 } else {
1436 Some(ids)
1437 }
1438 };
1439
1440 let db_security_groups = {
1443 let mut ids = Vec::new();
1444 for index in 1.. {
1445 let key = format!("DBSecurityGroups.DBSecurityGroupName.{index}");
1446 match optional_query_param(request, &key) {
1447 Some(name) => ids.push(name),
1448 None => break,
1449 }
1450 }
1451 if ids.is_empty() {
1452 None
1453 } else {
1454 Some(ids)
1455 }
1456 };
1457
1458 if let Some(ref class) = db_instance_class {
1459 validate_db_instance_class(class)?;
1460 }
1461
1462 let any_mutable_field = db_instance_class.is_some()
1466 || deletion_protection.is_some()
1467 || vpc_security_group_ids.is_some()
1468 || db_security_groups.is_some()
1469 || master_user_password.is_some()
1470 || backup_retention_period.is_some()
1471 || preferred_backup_window.is_some()
1472 || preferred_maintenance_window.is_some()
1473 || engine_version.is_some()
1474 || allocated_storage.is_some()
1475 || db_parameter_group_name.is_some()
1476 || multi_az.is_some()
1477 || iops.is_some()
1478 || storage_type.is_some()
1479 || storage_throughput.is_some()
1480 || master_user_secret_kms_key_id.is_some()
1481 || ca_certificate_identifier.is_some()
1482 || monitoring_interval.is_some()
1483 || performance_insights_enabled.is_some()
1484 || cloudwatch_changed
1485 || option_group_name.is_some()
1486 || auto_minor_version_upgrade.is_some()
1487 || copy_tags_to_snapshot.is_some()
1488 || delete_automated_backups.is_some()
1489 || enable_iam_db_auth.is_some()
1490 || max_allocated_storage.is_some()
1491 || network_type.is_some()
1492 || license_model.is_some()
1493 || multi_tenant.is_some()
1494 || publicly_accessible.is_some()
1495 || tde_credential_arn.is_some()
1496 || db_port_number.is_some()
1497 || domain.is_some()
1498 || domain_fqdn.is_some()
1499 || domain_ou.is_some()
1500 || domain_iam_role_name.is_some()
1501 || domain_auth_secret_arn.is_some()
1502 || domain_dns_ips.is_some()
1503 || disable_domain.is_some()
1504 || rotate_master_user_password.is_some();
1505 let mut accounts = self.state.write();
1506 let state = accounts.get_or_create(&request.account_id);
1507 let instance = state
1508 .instances
1509 .get_mut(&db_instance_identifier)
1510 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1511
1512 let _ = any_mutable_field;
1518
1519 if let Some(deletion_protection) = deletion_protection {
1521 instance.deletion_protection = deletion_protection;
1522 }
1523 if let Some(security_group_ids) = vpc_security_group_ids {
1524 instance.vpc_security_group_ids = security_group_ids;
1525 }
1526 if let Some(sg_names) = db_security_groups {
1527 instance.db_security_groups = sg_names;
1528 }
1529 if let Some(ca_id) = ca_certificate_identifier {
1530 instance.ca_certificate_identifier = Some(ca_id);
1531 }
1532 if let Some(kms_key) = master_user_secret_kms_key_id {
1533 instance.master_user_secret_kms_key_id = Some(kms_key);
1534 }
1535 if let Some(name) = option_group_name {
1536 instance.option_group_name = Some(name);
1537 }
1538 if let Some(b) = auto_minor_version_upgrade {
1539 instance.auto_minor_version_upgrade = Some(b);
1540 }
1541 if let Some(b) = copy_tags_to_snapshot {
1542 instance.copy_tags_to_snapshot = Some(b);
1543 }
1544 if let Some(b) = delete_automated_backups {
1545 instance.delete_automated_backups = Some(b);
1546 }
1547 if let Some(b) = enable_iam_db_auth {
1548 instance.iam_database_authentication_enabled = b;
1549 }
1550 if let Some(n) = max_allocated_storage {
1551 instance.max_allocated_storage = Some(n);
1552 }
1553 if let Some(nt) = network_type {
1554 instance.network_type = Some(nt);
1555 }
1556 if disable_domain == Some(true) {
1557 instance.domain = None;
1558 instance.domain_fqdn = None;
1559 instance.domain_ou = None;
1560 instance.domain_iam_role_name = None;
1561 instance.domain_auth_secret_arn = None;
1562 instance.domain_dns_ips.clear();
1563 } else {
1564 if let Some(v) = domain {
1565 instance.domain = Some(v);
1566 }
1567 if let Some(v) = domain_fqdn {
1568 instance.domain_fqdn = Some(v);
1569 }
1570 if let Some(v) = domain_ou {
1571 instance.domain_ou = Some(v);
1572 }
1573 if let Some(v) = domain_iam_role_name {
1574 instance.domain_iam_role_name = Some(v);
1575 }
1576 if let Some(v) = domain_auth_secret_arn {
1577 instance.domain_auth_secret_arn = Some(v);
1578 }
1579 if let Some(v) = domain_dns_ips {
1580 instance.domain_dns_ips = v;
1581 }
1582 }
1583 if cloudwatch_changed {
1584 let mut current: Vec<String> = instance.enabled_cloudwatch_logs_exports.clone();
1585 current.retain(|t| !cloudwatch_disable.contains(t));
1586 for t in &cloudwatch_enable {
1587 if !current.contains(t) {
1588 current.push(t.clone());
1589 }
1590 }
1591 instance.enabled_cloudwatch_logs_exports = current;
1592 }
1593 if rotate_master_user_password == Some(true) {
1597 instance.master_user_password = format!("rotated-{}", uuid::Uuid::new_v4().simple());
1598 }
1599
1600 let immediate = apply_immediately != Some(false);
1602 if immediate {
1603 if let Some(class) = db_instance_class {
1604 instance.db_instance_class = class;
1605 }
1606 if let Some(pwd) = master_user_password {
1607 instance.master_user_password = pwd;
1608 }
1609 if let Some(version) = engine_version {
1610 instance.engine_version = version;
1611 }
1612 if let Some(storage) = allocated_storage {
1613 instance.allocated_storage = storage;
1614 }
1615 if let Some(name) = db_parameter_group_name {
1616 instance.db_parameter_group_name = Some(name);
1617 }
1618 if let Some(az) = multi_az {
1619 instance.multi_az = az;
1620 }
1621 if let Some(iops_val) = iops {
1622 instance.iops = Some(iops_val);
1623 }
1624 if let Some(stype) = storage_type {
1625 instance.storage_type = Some(stype);
1626 }
1627 if let Some(t) = storage_throughput {
1628 instance.storage_throughput = Some(t);
1629 }
1630 if let Some(pi) = performance_insights_enabled {
1631 instance.performance_insights_enabled = pi;
1632 }
1633 if let Some(lm) = license_model {
1634 instance.license_model = Some(lm);
1635 }
1636 if let Some(b) = multi_tenant {
1637 instance.multi_tenant = Some(b);
1638 }
1639 if let Some(b) = publicly_accessible {
1640 instance.publicly_accessible = b;
1641 }
1642 if let Some(arn) = tde_credential_arn {
1643 instance.tde_credential_arn = Some(arn);
1644 }
1645 if let Some(p) = db_port_number {
1646 instance.port = p;
1647 }
1648 if let Some(retention) = backup_retention_period {
1649 instance.backup_retention_period = retention;
1650 }
1651 if let Some(window) = preferred_backup_window {
1652 instance.preferred_backup_window = window;
1653 }
1654 if let Some(window) = preferred_maintenance_window {
1655 instance.preferred_maintenance_window = Some(window);
1656 }
1657 if let Some(interval) = monitoring_interval {
1658 instance.monitoring_interval = Some(interval);
1659 }
1660 } else {
1661 let any_deferred = db_instance_class.is_some()
1662 || master_user_password.is_some()
1663 || engine_version.is_some()
1664 || allocated_storage.is_some()
1665 || db_parameter_group_name.is_some()
1666 || multi_az.is_some()
1667 || iops.is_some()
1668 || storage_type.is_some()
1669 || storage_throughput.is_some()
1670 || performance_insights_enabled.is_some()
1671 || license_model.is_some()
1672 || multi_tenant.is_some()
1673 || publicly_accessible.is_some()
1674 || tde_credential_arn.is_some()
1675 || db_port_number.is_some()
1676 || backup_retention_period.is_some()
1677 || preferred_backup_window.is_some()
1678 || preferred_maintenance_window.is_some()
1679 || monitoring_interval.is_some();
1680 if any_deferred {
1681 let pending = instance
1682 .pending_modified_values
1683 .get_or_insert(Default::default());
1684 if let Some(class) = db_instance_class {
1685 pending.db_instance_class = Some(class);
1686 }
1687 if let Some(pwd) = master_user_password {
1688 pending.master_user_password = Some(pwd);
1689 }
1690 if let Some(version) = engine_version {
1691 pending.engine_version = Some(version);
1692 }
1693 if let Some(storage) = allocated_storage {
1694 pending.allocated_storage = Some(storage);
1695 }
1696 if let Some(name) = db_parameter_group_name {
1697 pending.db_parameter_group_name = Some(name);
1698 }
1699 if let Some(az) = multi_az {
1700 pending.multi_az = Some(az);
1701 }
1702 if let Some(iops_val) = iops {
1703 pending.iops = Some(iops_val);
1704 }
1705 if let Some(stype) = storage_type {
1706 pending.storage_type = Some(stype);
1707 }
1708 if let Some(t) = storage_throughput {
1709 pending.storage_throughput = Some(t);
1710 }
1711 if let Some(pi) = performance_insights_enabled {
1712 pending.performance_insights_enabled = Some(pi);
1713 }
1714 if let Some(lm) = license_model {
1715 pending.license_model = Some(lm);
1716 }
1717 if let Some(b) = multi_tenant {
1718 pending.multi_tenant = Some(b);
1719 }
1720 if let Some(b) = publicly_accessible {
1721 pending.publicly_accessible = Some(b);
1722 }
1723 if let Some(arn) = tde_credential_arn {
1724 pending.tde_credential_arn = Some(arn);
1725 }
1726 if let Some(p) = db_port_number {
1727 pending.port = Some(p);
1728 }
1729 if let Some(retention) = backup_retention_period {
1730 pending.backup_retention_period = Some(retention);
1731 }
1732 if let Some(window) = preferred_backup_window {
1733 pending.preferred_backup_window = Some(window);
1734 }
1735 if let Some(window) = preferred_maintenance_window {
1736 pending.preferred_maintenance_window = Some(window);
1737 }
1738 if let Some(interval) = monitoring_interval {
1739 pending.monitoring_interval = Some(interval);
1740 }
1741 }
1742 }
1743 let instance_arn = instance.db_instance_arn.clone();
1744 let xml = query_response_xml(
1745 "ModifyDBInstance",
1746 RDS_NS,
1747 &format!(
1748 "<DBInstance>{}</DBInstance>",
1749 db_instance_xml(instance, Some("modifying"))
1750 ),
1751 &request.request_id,
1752 );
1753 drop(accounts);
1754
1755 self.emit_event(
1756 RdsSourceType::DbInstance,
1757 &db_instance_identifier,
1758 &instance_arn,
1759 "RDS-EVENT-0014",
1760 &["configuration change"],
1761 "DB instance was modified",
1762 );
1763
1764 Ok(AwsResponse::xml(StatusCode::OK, xml))
1765 }
1766
1767 async fn reboot_db_instance(
1768 &self,
1769 request: &AwsRequest,
1770 ) -> Result<AwsResponse, AwsServiceError> {
1771 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1772 let force_failover =
1773 parse_optional_bool(optional_query_param(request, "ForceFailover").as_deref())?;
1774 if force_failover == Some(true) {
1775 return Err(AwsServiceError::aws_error(
1776 StatusCode::BAD_REQUEST,
1777 "InvalidDBInstanceState",
1778 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
1779 ));
1780 }
1781
1782 let instance = {
1783 let accounts = self.state.read();
1784 let empty = RdsState::new(&request.account_id, &request.region);
1785 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1786 state
1787 .instances
1788 .get(&db_instance_identifier)
1789 .cloned()
1790 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
1791 };
1792
1793 let runtime = self.require_runtime()?;
1794
1795 let running = runtime
1796 .restart_container(
1797 &db_instance_identifier,
1798 &instance.engine,
1799 &instance.master_username,
1800 &instance.master_user_password,
1801 instance
1802 .db_name
1803 .as_deref()
1804 .unwrap_or(default_db_name(&instance.engine)),
1805 )
1806 .await
1807 .map_err(runtime_error_to_service_error)?;
1808
1809 let instance = {
1810 let mut accounts = self.state.write();
1811 let state = accounts.get_or_create(&request.account_id);
1812 let instance = state
1813 .instances
1814 .get_mut(&db_instance_identifier)
1815 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1816 instance.host_port = running.host_port;
1817 instance.port = i32::from(running.host_port);
1818
1819 if let Some(pending) = instance.pending_modified_values.take() {
1821 apply_pending_to_instance(instance, pending);
1822 }
1823
1824 instance.clone()
1825 };
1826
1827 self.emit_event(
1828 RdsSourceType::DbInstance,
1829 &db_instance_identifier,
1830 &instance.db_instance_arn,
1831 "RDS-EVENT-0006",
1832 &["availability"],
1833 "DB instance restarted",
1834 );
1835
1836 Ok(AwsResponse::xml(
1837 StatusCode::OK,
1838 query_response_xml(
1839 "RebootDBInstance",
1840 RDS_NS,
1841 &format!(
1842 "<DBInstance>{}</DBInstance>",
1843 db_instance_xml(&instance, Some("rebooting"))
1844 ),
1845 &request.request_id,
1846 ),
1847 ))
1848 }
1849
1850 fn describe_db_engine_versions(
1851 &self,
1852 request: &AwsRequest,
1853 ) -> Result<AwsResponse, AwsServiceError> {
1854 let engine = optional_query_param(request, "Engine");
1855 let engine_version = optional_query_param(request, "EngineVersion");
1856 let family = optional_query_param(request, "DBParameterGroupFamily");
1857 let default_only =
1858 parse_optional_bool(optional_query_param(request, "DefaultOnly").as_deref())?;
1859
1860 let mut versions = filter_engine_versions(
1861 &default_engine_versions(),
1862 &engine,
1863 &engine_version,
1864 &family,
1865 );
1866
1867 if default_only.unwrap_or(false) {
1868 versions.truncate(1);
1869 }
1870
1871 Ok(AwsResponse::xml(
1872 StatusCode::OK,
1873 query_response_xml(
1874 "DescribeDBEngineVersions",
1875 RDS_NS,
1876 &format!(
1877 "<DBEngineVersions>{}</DBEngineVersions>",
1878 versions.iter().map(engine_version_xml).collect::<String>()
1879 ),
1880 &request.request_id,
1881 ),
1882 ))
1883 }
1884
1885 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1886 let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1887 let marker = optional_query_param(request, "Marker");
1888 let max_records = optional_query_param(request, "MaxRecords");
1889
1890 let accounts = self.state.read();
1891 let empty = RdsState::new(&request.account_id, &request.region);
1892 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1893
1894 if let Some(identifier) = db_instance_identifier {
1896 let instance = state
1897 .instances
1898 .get(&identifier)
1899 .cloned()
1900 .ok_or_else(|| db_instance_not_found(&identifier))?;
1901
1902 return Ok(AwsResponse::xml(
1903 StatusCode::OK,
1904 query_response_xml(
1905 "DescribeDBInstances",
1906 RDS_NS,
1907 &format!(
1908 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1909 db_instance_xml(&instance, None)
1910 ),
1911 &request.request_id,
1912 ),
1913 ));
1914 }
1915
1916 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1918 instances.sort_by(|a, b| {
1919 a.created_at
1920 .cmp(&b.created_at)
1921 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1922 });
1923
1924 let paginated = paginate(instances, marker, max_records, |inst| {
1926 &inst.db_instance_identifier
1927 })?;
1928
1929 let marker_xml = paginated
1930 .next_marker
1931 .as_ref()
1932 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1933 .unwrap_or_default();
1934
1935 Ok(AwsResponse::xml(
1936 StatusCode::OK,
1937 query_response_xml(
1938 "DescribeDBInstances",
1939 RDS_NS,
1940 &format!(
1941 "<DBInstances>{}</DBInstances>{}",
1942 paginated
1943 .items
1944 .iter()
1945 .map(|instance| {
1946 format!(
1947 "<DBInstance>{}</DBInstance>",
1948 db_instance_xml(instance, None)
1949 )
1950 })
1951 .collect::<String>(),
1952 marker_xml
1953 ),
1954 &request.request_id,
1955 ),
1956 ))
1957 }
1958
1959 fn describe_orderable_db_instance_options(
1960 &self,
1961 request: &AwsRequest,
1962 ) -> Result<AwsResponse, AwsServiceError> {
1963 let engine = optional_query_param(request, "Engine");
1964 let engine_version = optional_query_param(request, "EngineVersion");
1965 let db_instance_class = optional_query_param(request, "DBInstanceClass");
1966 let license_model = optional_query_param(request, "LicenseModel");
1967 let vpc = parse_optional_bool(optional_query_param(request, "Vpc").as_deref())?;
1968
1969 let options = filter_orderable_options(
1970 &default_orderable_options(),
1971 &engine,
1972 &engine_version,
1973 &db_instance_class,
1974 &license_model,
1975 vpc,
1976 );
1977
1978 Ok(AwsResponse::xml(
1979 StatusCode::OK,
1980 query_response_xml(
1981 "DescribeOrderableDBInstanceOptions",
1982 RDS_NS,
1983 &format!(
1984 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1985 options.iter().map(orderable_option_xml).collect::<String>()
1986 ),
1987 &request.request_id,
1988 ),
1989 ))
1990 }
1991
1992 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1993 let resource_name = required_query_param(request, "ResourceName")?;
1994 let tags = parse_tags(request)?;
1995
1996 let mut accounts = self.state.write();
2001 let state = accounts.get_or_create(&request.account_id);
2002 let mut target = match resolve_tag_target_mut(state, &resource_name) {
2003 Ok(t) => t,
2004 Err(e) => {
2005 if is_declared_add_tags_not_found(e.code()) {
2011 return Err(e);
2012 }
2013 return Ok(AwsResponse::xml(
2014 StatusCode::OK,
2015 query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
2016 ));
2017 }
2018 };
2019 target.merge(&tags);
2020
2021 Ok(AwsResponse::xml(
2022 StatusCode::OK,
2023 query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
2024 ))
2025 }
2026
2027 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2028 let resource_name = required_query_param(request, "ResourceName")?;
2029 let _ignored_filters = query_param_prefix_exists(request, "Filters.");
2034
2035 let accounts = self.state.read();
2036 let empty = RdsState::new(&request.account_id, &request.region);
2037 let state = accounts.get(&request.account_id).unwrap_or(&empty);
2038 let target = resolve_tag_target(state, &resource_name)?;
2039 let tag_xml = target.to_xml();
2040
2041 Ok(AwsResponse::xml(
2042 StatusCode::OK,
2043 query_response_xml(
2044 "ListTagsForResource",
2045 RDS_NS,
2046 &format!("<TagList>{tag_xml}</TagList>"),
2047 &request.request_id,
2048 ),
2049 ))
2050 }
2051
2052 fn remove_tags_from_resource(
2053 &self,
2054 request: &AwsRequest,
2055 ) -> Result<AwsResponse, AwsServiceError> {
2056 let resource_name = required_query_param(request, "ResourceName")?;
2057 let tag_keys = parse_tag_keys(request)?;
2058
2059 let mut accounts = self.state.write();
2065 let state = accounts.get_or_create(&request.account_id);
2066 let mut target = resolve_tag_target_mut(state, &resource_name)?;
2067 target.remove_keys(&tag_keys);
2068
2069 Ok(AwsResponse::xml(
2070 StatusCode::OK,
2071 query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
2072 ))
2073 }
2074
2075 async fn create_db_snapshot(
2076 &self,
2077 request: &AwsRequest,
2078 ) -> Result<AwsResponse, AwsServiceError> {
2079 let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
2080 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2081
2082 let (instance, db_name) = {
2083 let accounts = self.state.read();
2084 let empty = RdsState::new(&request.account_id, &request.region);
2085 let state = accounts.get(&request.account_id).unwrap_or(&empty);
2086
2087 if state.snapshots.contains_key(&db_snapshot_identifier) {
2088 return Err(AwsServiceError::aws_error(
2089 StatusCode::CONFLICT,
2090 "DBSnapshotAlreadyExists",
2091 format!("DBSnapshot {db_snapshot_identifier} already exists."),
2092 ));
2093 }
2094
2095 let instance = state
2096 .instances
2097 .get(&db_instance_identifier)
2098 .cloned()
2099 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
2100
2101 let default_db = default_db_name(&instance.engine);
2102 let db_name = instance
2103 .db_name
2104 .as_deref()
2105 .unwrap_or(default_db)
2106 .to_string();
2107
2108 (instance, db_name)
2109 };
2110
2111 let runtime = self.runtime.as_ref().ok_or_else(|| {
2115 AwsServiceError::aws_error(
2116 StatusCode::SERVICE_UNAVAILABLE,
2117 "InvalidDBInstanceState",
2118 "Docker/Podman is required for RDS snapshots but is not available",
2119 )
2120 })?;
2121
2122 let dump_data = runtime
2123 .dump_database(
2124 &db_instance_identifier,
2125 &instance.engine,
2126 &instance.master_username,
2127 &instance.master_user_password,
2128 &db_name,
2129 )
2130 .await
2131 .map_err(runtime_error_to_service_error)?;
2132
2133 let mut accounts = self.state.write();
2134 let state = accounts.get_or_create(&request.account_id);
2135
2136 if state.snapshots.contains_key(&db_snapshot_identifier) {
2137 return Err(AwsServiceError::aws_error(
2138 StatusCode::CONFLICT,
2139 "DBSnapshotAlreadyExists",
2140 format!("DBSnapshot {db_snapshot_identifier} already exists."),
2141 ));
2142 }
2143
2144 let snapshot = DbSnapshot {
2145 db_snapshot_identifier: db_snapshot_identifier.clone(),
2146 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
2147 db_instance_identifier: instance.db_instance_identifier.clone(),
2148 snapshot_create_time: Utc::now(),
2149 engine: instance.engine.clone(),
2150 engine_version: instance.engine_version.clone(),
2151 allocated_storage: instance.allocated_storage,
2152 status: "available".to_string(),
2153 port: instance.port,
2154 master_username: instance.master_username.clone(),
2155 db_name: instance.db_name.clone(),
2156 dbi_resource_id: instance.dbi_resource_id.clone(),
2157 snapshot_type: "manual".to_string(),
2158 master_user_password: instance.master_user_password.clone(),
2159 tags: Vec::new(),
2160 dump_data,
2161 availability_zone: instance.availability_zone.clone(),
2162 vpc_id: None,
2163 instance_create_time: Some(instance.created_at),
2164 license_model: Some(
2165 service_helpers::license_model_for_engine(&instance.engine).to_string(),
2166 ),
2167 iops: instance.iops,
2168 option_group_name: instance.option_group_name.clone(),
2169 percent_progress: Some(100),
2170 storage_type: instance.storage_type.clone(),
2171 encrypted: instance.storage_encrypted,
2172 kms_key_id: instance.kms_key_id.clone(),
2173 iam_database_authentication_enabled: instance.iam_database_authentication_enabled,
2174 timezone: None,
2175 storage_throughput: None,
2176 };
2177
2178 state
2179 .snapshots
2180 .insert(db_snapshot_identifier.clone(), snapshot.clone());
2181 let snapshot_arn = snapshot.db_snapshot_arn.clone();
2182 drop(accounts);
2183
2184 self.emit_event(
2185 RdsSourceType::DbSnapshot,
2186 &db_snapshot_identifier,
2187 &snapshot_arn,
2188 "RDS-EVENT-0042",
2189 &["creation"],
2190 "Manual snapshot created",
2191 );
2192
2193 Ok(AwsResponse::xml(
2194 StatusCode::OK,
2195 query_response_xml(
2196 "CreateDBSnapshot",
2197 RDS_NS,
2198 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
2199 &request.request_id,
2200 ),
2201 ))
2202 }
2203
2204 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2205 let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier");
2206 let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
2207 let marker = optional_query_param(request, "Marker");
2208 let max_records = optional_query_param(request, "MaxRecords");
2209
2210 let accounts = self.state.read();
2216 let empty = RdsState::new(&request.account_id, &request.region);
2217 let state = accounts.get(&request.account_id).unwrap_or(&empty);
2218
2219 if let Some(snapshot_id) = db_snapshot_identifier {
2221 let snapshot = state
2222 .snapshots
2223 .get(&snapshot_id)
2224 .cloned()
2225 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
2226
2227 return Ok(AwsResponse::xml(
2228 StatusCode::OK,
2229 query_response_xml(
2230 "DescribeDBSnapshots",
2231 RDS_NS,
2232 &format!(
2233 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
2234 db_snapshot_xml(&snapshot)
2235 ),
2236 &request.request_id,
2237 ),
2238 ));
2239 }
2240
2241 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
2243 state
2244 .snapshots
2245 .values()
2246 .filter(|s| s.db_instance_identifier == instance_id)
2247 .cloned()
2248 .collect()
2249 } else {
2250 state.snapshots.values().cloned().collect()
2251 };
2252
2253 snapshots.sort_by(|a, b| {
2255 a.snapshot_create_time
2256 .cmp(&b.snapshot_create_time)
2257 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
2258 });
2259
2260 let paginated = paginate(snapshots, marker, max_records, |snap| {
2262 &snap.db_snapshot_identifier
2263 })?;
2264
2265 let marker_xml = paginated
2266 .next_marker
2267 .as_ref()
2268 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2269 .unwrap_or_default();
2270
2271 Ok(AwsResponse::xml(
2272 StatusCode::OK,
2273 query_response_xml(
2274 "DescribeDBSnapshots",
2275 RDS_NS,
2276 &format!(
2277 "<DBSnapshots>{}</DBSnapshots>{}",
2278 paginated
2279 .items
2280 .iter()
2281 .map(|snapshot| format!(
2282 "<DBSnapshot>{}</DBSnapshot>",
2283 db_snapshot_xml(snapshot)
2284 ))
2285 .collect::<String>(),
2286 marker_xml
2287 ),
2288 &request.request_id,
2289 ),
2290 ))
2291 }
2292
2293 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2294 let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
2295
2296 let mut accounts = self.state.write();
2297 let state = accounts.get_or_create(&request.account_id);
2298
2299 let snapshot = state
2300 .snapshots
2301 .remove(&db_snapshot_identifier)
2302 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
2303 let snapshot_arn = snapshot.db_snapshot_arn.clone();
2304 drop(accounts);
2305
2306 self.emit_event(
2307 RdsSourceType::DbSnapshot,
2308 &db_snapshot_identifier,
2309 &snapshot_arn,
2310 "RDS-EVENT-0041",
2311 &["deletion"],
2312 "Manual snapshot deleted",
2313 );
2314
2315 Ok(AwsResponse::xml(
2316 StatusCode::OK,
2317 query_response_xml(
2318 "DeleteDBSnapshot",
2319 RDS_NS,
2320 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
2321 &request.request_id,
2322 ),
2323 ))
2324 }
2325
2326 async fn restore_db_instance_from_db_snapshot(
2327 &self,
2328 request: &AwsRequest,
2329 ) -> Result<AwsResponse, AwsServiceError> {
2330 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2331 let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier")
2335 .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
2336 .ok_or_else(|| db_snapshot_not_found("(none)"))?;
2337 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2338 let tags = parse_tags(request)?;
2339
2340 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
2341 let mut accounts = self.state.write();
2342 let state = accounts.get_or_create(&request.account_id);
2343
2344 if !state.begin_instance_creation(&db_instance_identifier) {
2345 return Err(AwsServiceError::aws_error(
2346 StatusCode::CONFLICT,
2347 "DBInstanceAlreadyExists",
2348 format!("DBInstance {db_instance_identifier} already exists."),
2349 ));
2350 }
2351
2352 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
2353 Some(s) => s,
2354 None => {
2355 state.cancel_instance_creation(&db_instance_identifier);
2356 return Err(db_snapshot_not_found(&db_snapshot_identifier));
2357 }
2358 };
2359
2360 let dbi_resource_id = state.next_dbi_resource_id();
2361 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
2362 let created_at = Utc::now();
2363
2364 (snapshot, dbi_resource_id, db_instance_arn, created_at)
2365 };
2366
2367 let runtime = self.require_runtime()?;
2370
2371 let db_name = snapshot
2372 .db_name
2373 .as_deref()
2374 .unwrap_or(default_db_name(&snapshot.engine));
2375 let running = match runtime
2376 .ensure_postgres(
2377 &db_instance_identifier,
2378 &snapshot.engine,
2379 &snapshot.engine_version,
2380 &snapshot.master_username,
2381 &snapshot.master_user_password,
2382 db_name,
2383 &request.account_id,
2384 &request.region,
2385 )
2386 .await
2387 {
2388 Ok(running) => running,
2389 Err(e) => {
2390 self.state
2391 .write()
2392 .get_or_create(&request.account_id)
2393 .cancel_instance_creation(&db_instance_identifier);
2394 return Err(runtime_error_to_service_error(e));
2395 }
2396 };
2397
2398 if let Err(e) = runtime
2399 .restore_database(
2400 &db_instance_identifier,
2401 &snapshot.engine,
2402 &snapshot.master_username,
2403 &snapshot.master_user_password,
2404 db_name,
2405 &snapshot.dump_data,
2406 )
2407 .await
2408 {
2409 self.state
2410 .write()
2411 .get_or_create(&request.account_id)
2412 .cancel_instance_creation(&db_instance_identifier);
2413 runtime.stop_container(&db_instance_identifier).await;
2414 return Err(runtime_error_to_service_error(e));
2415 }
2416
2417 let instance = build_restored_instance(
2418 &db_instance_identifier,
2419 db_instance_arn,
2420 dbi_resource_id,
2421 created_at,
2422 vpc_security_group_ids,
2423 &snapshot,
2424 &running,
2425 tags,
2426 );
2427
2428 self.state
2429 .write()
2430 .get_or_create(&request.account_id)
2431 .finish_instance_creation(instance.clone());
2432
2433 self.emit_event(
2434 RdsSourceType::DbInstance,
2435 &db_instance_identifier,
2436 &instance.db_instance_arn,
2437 "RDS-EVENT-0043",
2438 &["creation"],
2439 "DB instance restored from snapshot",
2440 );
2441
2442 Ok(AwsResponse::xml(
2443 StatusCode::OK,
2444 query_response_xml(
2445 "RestoreDBInstanceFromDBSnapshot",
2446 RDS_NS,
2447 &format!(
2448 "<DBInstance>{}</DBInstance>",
2449 db_instance_xml(&instance, None)
2450 ),
2451 &request.request_id,
2452 ),
2453 ))
2454 }
2455
2456 async fn create_db_instance_read_replica(
2457 &self,
2458 request: &AwsRequest,
2459 ) -> Result<AwsResponse, AwsServiceError> {
2460 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2461 let source_db_instance_identifier =
2467 optional_query_param(request, "SourceDBInstanceIdentifier")
2468 .or_else(|| optional_query_param(request, "SourceDBClusterIdentifier"))
2469 .ok_or_else(|| db_instance_not_found("(none)"))?;
2470
2471 let (source_instance, db_name) = {
2472 let mut accounts = self.state.write();
2473 let state = accounts.get_or_create(&request.account_id);
2474
2475 if !state.begin_instance_creation(&db_instance_identifier) {
2476 return Err(AwsServiceError::aws_error(
2477 StatusCode::CONFLICT,
2478 "DBInstanceAlreadyExists",
2479 format!("DBInstance {db_instance_identifier} already exists."),
2480 ));
2481 }
2482
2483 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
2484 {
2485 Some(inst) => inst,
2486 None => {
2487 state.cancel_instance_creation(&db_instance_identifier);
2488 return Err(db_instance_not_found(&source_db_instance_identifier));
2489 }
2490 };
2491
2492 let default_db = default_db_name(&source_instance.engine);
2493 let db_name = source_instance
2494 .db_name
2495 .as_deref()
2496 .unwrap_or(default_db)
2497 .to_string();
2498
2499 (source_instance, db_name)
2500 };
2501
2502 let runtime = self.runtime.as_ref().ok_or_else(|| {
2506 AwsServiceError::aws_error(
2507 StatusCode::SERVICE_UNAVAILABLE,
2508 "InsufficientDBInstanceCapacity",
2509 "Docker/Podman is required for RDS read replicas but is not available",
2510 )
2511 })?;
2512
2513 let dump_data = match runtime
2514 .dump_database(
2515 &source_db_instance_identifier,
2516 &source_instance.engine,
2517 &source_instance.master_username,
2518 &source_instance.master_user_password,
2519 &db_name,
2520 )
2521 .await
2522 {
2523 Ok(data) => data,
2524 Err(e) => {
2525 self.state
2526 .write()
2527 .get_or_create(&request.account_id)
2528 .cancel_instance_creation(&db_instance_identifier);
2529 return Err(runtime_error_to_service_error(e));
2530 }
2531 };
2532
2533 let (dbi_resource_id, db_instance_arn) = {
2534 let accounts = self.state.read();
2535 let empty = RdsState::new(&request.account_id, &request.region);
2536 let s = accounts.get(&request.account_id).unwrap_or(&empty);
2537 (
2538 s.next_dbi_resource_id(),
2539 s.db_instance_arn(&db_instance_identifier),
2540 )
2541 };
2542 let created_at = Utc::now();
2543
2544 let running = match runtime
2545 .ensure_postgres(
2546 &db_instance_identifier,
2547 &source_instance.engine,
2548 &source_instance.engine_version,
2549 &source_instance.master_username,
2550 &source_instance.master_user_password,
2551 &db_name,
2552 &request.account_id,
2553 &request.region,
2554 )
2555 .await
2556 {
2557 Ok(running) => running,
2558 Err(e) => {
2559 self.state
2560 .write()
2561 .get_or_create(&request.account_id)
2562 .cancel_instance_creation(&db_instance_identifier);
2563 return Err(runtime_error_to_service_error(e));
2564 }
2565 };
2566
2567 if let Err(e) = runtime
2568 .restore_database(
2569 &db_instance_identifier,
2570 &source_instance.engine,
2571 &source_instance.master_username,
2572 &source_instance.master_user_password,
2573 &db_name,
2574 &dump_data,
2575 )
2576 .await
2577 {
2578 self.state
2579 .write()
2580 .get_or_create(&request.account_id)
2581 .cancel_instance_creation(&db_instance_identifier);
2582 runtime.stop_container(&db_instance_identifier).await;
2583 return Err(runtime_error_to_service_error(e));
2584 }
2585
2586 let replica = build_read_replica_instance(
2587 &db_instance_identifier,
2588 db_instance_arn,
2589 dbi_resource_id,
2590 created_at,
2591 &source_db_instance_identifier,
2592 &source_instance,
2593 &running,
2594 );
2595
2596 let source_missing = {
2597 let mut accounts = self.state.write();
2598 let state = accounts.get_or_create(&request.account_id);
2599 match state.instances.get_mut(&source_db_instance_identifier) {
2600 Some(source) => {
2601 source
2602 .read_replica_db_instance_identifiers
2603 .push(db_instance_identifier.clone());
2604 state.finish_instance_creation(replica.clone());
2605 false
2606 }
2607 None => {
2608 state.cancel_instance_creation(&db_instance_identifier);
2609 true
2610 }
2611 }
2612 };
2613
2614 if source_missing {
2615 runtime.stop_container(&db_instance_identifier).await;
2616 return Err(db_instance_not_found(&source_db_instance_identifier));
2617 }
2618
2619 self.emit_event(
2620 RdsSourceType::DbInstance,
2621 &db_instance_identifier,
2622 &replica.db_instance_arn,
2623 "RDS-EVENT-0005",
2624 &["creation", "read replica"],
2625 "Read replica DB instance created",
2626 );
2627
2628 Ok(AwsResponse::xml(
2629 StatusCode::OK,
2630 query_response_xml(
2631 "CreateDBInstanceReadReplica",
2632 RDS_NS,
2633 &format!(
2634 "<DBInstance>{}</DBInstance>",
2635 db_instance_xml(&replica, None)
2636 ),
2637 &request.request_id,
2638 ),
2639 ))
2640 }
2641
2642 async fn restore_db_instance_to_point_in_time(
2643 &self,
2644 request: &AwsRequest,
2645 ) -> Result<AwsResponse, AwsServiceError> {
2646 let target_id = required_query_param(request, "TargetDBInstanceIdentifier")?;
2647 let source_id = optional_query_param(request, "SourceDBInstanceIdentifier")
2653 .or_else(|| optional_query_param(request, "SourceDbiResourceId"))
2654 .or_else(|| optional_query_param(request, "SourceDBInstanceAutomatedBackupsArn"))
2655 .ok_or_else(|| db_instance_not_found("(none)"))?;
2656 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2657 let tags = parse_tags(request)?;
2658
2659 let (source_instance, db_name) = {
2660 let mut accounts = self.state.write();
2661 let state = accounts.get_or_create(&request.account_id);
2662
2663 if !state.begin_instance_creation(&target_id) {
2664 return Err(AwsServiceError::aws_error(
2665 StatusCode::CONFLICT,
2666 "DBInstanceAlreadyExists",
2667 format!("DBInstance {target_id} already exists."),
2668 ));
2669 }
2670
2671 let source_instance = match state.instances.get(&source_id).cloned() {
2672 Some(inst) => inst,
2673 None => {
2674 state.cancel_instance_creation(&target_id);
2675 return Err(db_instance_not_found(&source_id));
2676 }
2677 };
2678
2679 let default_db = default_db_name(&source_instance.engine);
2680 let db_name = source_instance
2681 .db_name
2682 .as_deref()
2683 .unwrap_or(default_db)
2684 .to_string();
2685
2686 (source_instance, db_name)
2687 };
2688
2689 let runtime = match self.require_runtime() {
2690 Ok(rt) => rt,
2691 Err(e) => {
2692 self.state
2693 .write()
2694 .get_or_create(&request.account_id)
2695 .cancel_instance_creation(&target_id);
2696 return Err(e);
2697 }
2698 };
2699
2700 let dump_data = match runtime
2701 .dump_database(
2702 &source_id,
2703 &source_instance.engine,
2704 &source_instance.master_username,
2705 &source_instance.master_user_password,
2706 &db_name,
2707 )
2708 .await
2709 {
2710 Ok(data) => data,
2711 Err(e) => {
2712 self.state
2713 .write()
2714 .get_or_create(&request.account_id)
2715 .cancel_instance_creation(&target_id);
2716 return Err(runtime_error_to_service_error(e));
2717 }
2718 };
2719
2720 let (dbi_resource_id, db_instance_arn) = {
2721 let accounts = self.state.read();
2722 let empty = RdsState::new(&request.account_id, &request.region);
2723 let s = accounts.get(&request.account_id).unwrap_or(&empty);
2724 (s.next_dbi_resource_id(), s.db_instance_arn(&target_id))
2725 };
2726 let created_at = Utc::now();
2727
2728 let running = match runtime
2729 .ensure_postgres(
2730 &target_id,
2731 &source_instance.engine,
2732 &source_instance.engine_version,
2733 &source_instance.master_username,
2734 &source_instance.master_user_password,
2735 &db_name,
2736 &request.account_id,
2737 &request.region,
2738 )
2739 .await
2740 {
2741 Ok(running) => running,
2742 Err(e) => {
2743 self.state
2744 .write()
2745 .get_or_create(&request.account_id)
2746 .cancel_instance_creation(&target_id);
2747 return Err(runtime_error_to_service_error(e));
2748 }
2749 };
2750
2751 if let Err(e) = runtime
2752 .restore_database(
2753 &target_id,
2754 &source_instance.engine,
2755 &source_instance.master_username,
2756 &source_instance.master_user_password,
2757 &db_name,
2758 &dump_data,
2759 )
2760 .await
2761 {
2762 self.state
2763 .write()
2764 .get_or_create(&request.account_id)
2765 .cancel_instance_creation(&target_id);
2766 runtime.stop_container(&target_id).await;
2767 return Err(runtime_error_to_service_error(e));
2768 }
2769
2770 let restore_to_time = required_query_param(request, "RestoreTime")
2771 .ok()
2772 .or_else(|| required_query_param(request, "RestoreToTime").ok());
2773 let use_latest = required_query_param(request, "UseLatestRestorableTime")
2774 .ok()
2775 .map(|s| s.eq_ignore_ascii_case("true"))
2776 .unwrap_or(false);
2777
2778 let mut instance = build_pit_restored_instance(
2779 &target_id,
2780 db_instance_arn,
2781 dbi_resource_id,
2782 created_at,
2783 vpc_security_group_ids,
2784 &source_instance,
2785 &running,
2786 tags,
2787 );
2788
2789 if let Some(t) = restore_to_time.as_ref() {
2790 if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(t) {
2791 instance.latest_restorable_time = Some(parsed.with_timezone(&Utc));
2792 }
2793 } else if use_latest {
2794 instance.latest_restorable_time = source_instance.latest_restorable_time;
2795 }
2796
2797 self.state
2798 .write()
2799 .get_or_create(&request.account_id)
2800 .finish_instance_creation(instance.clone());
2801
2802 self.emit_event(
2803 RdsSourceType::DbInstance,
2804 &target_id,
2805 &instance.db_instance_arn,
2806 "RDS-EVENT-0008",
2807 &["creation"],
2808 "DB instance restored to point in time",
2809 );
2810
2811 Ok(AwsResponse::xml(
2812 StatusCode::OK,
2813 query_response_xml(
2814 "RestoreDBInstanceToPointInTime",
2815 RDS_NS,
2816 &format!(
2817 "<DBInstance>{}</DBInstance>",
2818 db_instance_xml(&instance, None)
2819 ),
2820 &request.request_id,
2821 ),
2822 ))
2823 }
2824
2825 async fn restore_db_instance_from_s3(
2826 &self,
2827 request: &AwsRequest,
2828 ) -> Result<AwsResponse, AwsServiceError> {
2829 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2830 let s3_bucket = required_query_param(request, "S3BucketName")?;
2831 let s3_prefix = optional_query_param(request, "S3Prefix").unwrap_or_default();
2832 let master_username =
2836 optional_query_param(request, "MasterUsername").unwrap_or_else(|| "admin".to_string());
2837 let master_user_password = optional_query_param(request, "MasterUserPassword")
2838 .unwrap_or_else(|| "Password1!".to_string());
2839 let engine = required_query_param(request, "Engine")?;
2840 let engine_version = optional_query_param(request, "EngineVersion")
2841 .or_else(|| optional_query_param(request, "SourceEngineVersion"))
2842 .unwrap_or_else(|| match engine.as_str() {
2843 "postgres" => "16.3".to_string(),
2844 "mysql" => "8.0".to_string(),
2845 "mariadb" => "10.6".to_string(),
2846 _ => "0".to_string(),
2847 });
2848 let allocated_storage = optional_query_param(request, "AllocatedStorage")
2849 .and_then(|s| s.parse::<i32>().ok())
2850 .unwrap_or(20);
2851 let db_instance_class = optional_query_param(request, "DBInstanceClass")
2852 .unwrap_or_else(|| "db.t3.micro".to_string());
2853 let db_name_opt = optional_query_param(request, "DBName");
2854 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2855 let tags = parse_tags(request)?;
2856
2857 let bus = self.delivery_bus.as_ref().ok_or_else(|| {
2858 AwsServiceError::aws_error(
2859 StatusCode::SERVICE_UNAVAILABLE,
2860 "InvalidS3BucketFault",
2861 "S3 client not wired into RDS service",
2862 )
2863 })?;
2864
2865 let dump_data = bus
2866 .get_object_from_s3(&request.account_id, &s3_bucket, &s3_prefix)
2867 .map_err(|e| {
2868 AwsServiceError::aws_error(
2869 StatusCode::BAD_REQUEST,
2870 "InvalidS3BucketFault",
2871 format!("S3 backup at {s3_bucket}/{s3_prefix} unavailable: {e}"),
2872 )
2873 })?;
2874
2875 let runtime = self.require_runtime()?;
2876
2877 let (dbi_resource_id, db_instance_arn) = {
2878 let mut accounts = self.state.write();
2879 let state = accounts.get_or_create(&request.account_id);
2880
2881 if !state.begin_instance_creation(&db_instance_identifier) {
2882 return Err(AwsServiceError::aws_error(
2883 StatusCode::CONFLICT,
2884 "DBInstanceAlreadyExists",
2885 format!("DBInstance {db_instance_identifier} already exists."),
2886 ));
2887 }
2888
2889 (
2890 state.next_dbi_resource_id(),
2891 state.db_instance_arn(&db_instance_identifier),
2892 )
2893 };
2894
2895 let db_name = db_name_opt.unwrap_or_else(|| default_db_name(&engine).to_string());
2896 let created_at = Utc::now();
2897
2898 let running = match runtime
2899 .ensure_postgres(
2900 &db_instance_identifier,
2901 &engine,
2902 &engine_version,
2903 &master_username,
2904 &master_user_password,
2905 &db_name,
2906 &request.account_id,
2907 &request.region,
2908 )
2909 .await
2910 {
2911 Ok(running) => running,
2912 Err(e) => {
2913 self.state
2914 .write()
2915 .get_or_create(&request.account_id)
2916 .cancel_instance_creation(&db_instance_identifier);
2917 return Err(runtime_error_to_service_error(e));
2918 }
2919 };
2920
2921 if let Err(e) = runtime
2922 .restore_database(
2923 &db_instance_identifier,
2924 &engine,
2925 &master_username,
2926 &master_user_password,
2927 &db_name,
2928 &dump_data,
2929 )
2930 .await
2931 {
2932 self.state
2933 .write()
2934 .get_or_create(&request.account_id)
2935 .cancel_instance_creation(&db_instance_identifier);
2936 runtime.stop_container(&db_instance_identifier).await;
2937 return Err(runtime_error_to_service_error(e));
2938 }
2939
2940 let instance = build_s3_restored_instance(
2941 &db_instance_identifier,
2942 db_instance_arn,
2943 dbi_resource_id,
2944 created_at,
2945 allocated_storage,
2946 db_instance_class,
2947 engine.clone(),
2948 engine_version,
2949 master_username,
2950 master_user_password,
2951 db_name,
2952 vpc_security_group_ids,
2953 &running,
2954 tags,
2955 );
2956
2957 self.state
2958 .write()
2959 .get_or_create(&request.account_id)
2960 .finish_instance_creation(instance.clone());
2961
2962 self.emit_event(
2963 RdsSourceType::DbInstance,
2964 &db_instance_identifier,
2965 &instance.db_instance_arn,
2966 "RDS-EVENT-0043",
2967 &["creation"],
2968 "DB instance restored from S3 backup",
2969 );
2970
2971 Ok(AwsResponse::xml(
2972 StatusCode::OK,
2973 query_response_xml(
2974 "RestoreDBInstanceFromS3",
2975 RDS_NS,
2976 &format!(
2977 "<DBInstance>{}</DBInstance>",
2978 db_instance_xml(&instance, None)
2979 ),
2980 &request.request_id,
2981 ),
2982 ))
2983 }
2984
2985 async fn create_db_cluster_snapshot(
2990 &self,
2991 request: &AwsRequest,
2992 ) -> Result<AwsResponse, AwsServiceError> {
2993 use serde_json::json;
2994 let snapshot_id = required_query_param(request, "DBClusterSnapshotIdentifier")?;
2995 let cluster_id = required_query_param(request, "DBClusterIdentifier")?;
2996 let arn = format!(
2997 "arn:aws:rds:{}:{}:cluster-snapshot:{}",
2998 request.region, request.account_id, snapshot_id
2999 );
3000
3001 let writer_info = {
3002 let accounts = self.state.read();
3003 accounts.get(&request.account_id).and_then(|state| {
3004 let cluster_entry = state.extras.get("clusters")?.get(&cluster_id)?;
3005 let writer_id = cluster_entry
3006 .get("WriterDBInstanceIdentifier")
3007 .and_then(|v| v.as_str())
3008 .map(str::to_string)
3009 .or_else(|| {
3010 cluster_entry
3011 .get("DBClusterMembers")
3012 .and_then(|m| m.as_array())
3013 .and_then(|arr| {
3014 arr.iter()
3015 .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
3016 .or_else(|| arr.first())
3017 .and_then(|m| m["DBInstanceIdentifier"].as_str())
3018 .map(str::to_string)
3019 })
3020 })?;
3021 let inst = state.instances.get(&writer_id)?;
3022 Some((
3023 inst.db_instance_identifier.clone(),
3024 inst.engine.clone(),
3025 inst.master_username.clone(),
3026 inst.master_user_password.clone(),
3027 inst.db_name
3028 .clone()
3029 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
3030 ))
3031 })
3032 };
3033
3034 let dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
3035 if let Some(runtime) = self.runtime_ref() {
3036 match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
3037 Ok(data) => {
3038 use base64::Engine;
3039 Some(base64::engine::general_purpose::STANDARD.encode(&data))
3040 }
3041 Err(e) => {
3042 tracing::warn!(
3043 error = %e,
3044 cluster = %cluster_id,
3045 writer = %wid,
3046 "cluster snapshot dump failed; falling back to metadata-only snapshot"
3047 );
3048 None
3049 }
3050 }
3051 } else {
3052 None
3053 }
3054 } else {
3055 None
3056 };
3057
3058 {
3059 let mut accounts = self.state.write();
3060 let state = accounts.get_or_create(&request.account_id);
3061 let mut entry = state
3062 .extras
3063 .get("clusters")
3064 .and_then(|m| m.get(&cluster_id))
3065 .cloned()
3066 .unwrap_or_else(|| json!({}));
3067 if let Some(obj) = entry.as_object_mut() {
3068 obj.insert(
3069 "DBClusterSnapshotIdentifier".to_string(),
3070 json!(snapshot_id),
3071 );
3072 obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
3073 obj.insert("DBClusterIdentifier".to_string(), json!(cluster_id));
3074 obj.insert("Status".to_string(), json!("available"));
3075 obj.insert("SnapshotType".to_string(), json!("manual"));
3076 if let Some(b64) = dump_b64.as_ref() {
3077 obj.insert("DumpDataB64".to_string(), json!(b64));
3078 }
3079 }
3080 state
3081 .extras
3082 .entry("cluster_snapshots".to_string())
3083 .or_default()
3084 .insert(snapshot_id.clone(), entry);
3085 }
3086
3087 self.emit_event(
3088 RdsSourceType::DbClusterSnapshot,
3089 &snapshot_id,
3090 &arn,
3091 "RDS-EVENT-0074",
3092 &["backup"],
3093 "DB cluster snapshot created",
3094 );
3095
3096 Ok(AwsResponse::xml(
3097 StatusCode::OK,
3098 query_response_xml(
3099 "CreateDBClusterSnapshot",
3100 RDS_NS,
3101 &crate::extras::cluster_snapshot_xml(&snapshot_id, &arn, &cluster_id),
3102 &request.request_id,
3103 ),
3104 ))
3105 }
3106
3107 async fn restore_db_cluster_from_snapshot(
3113 &self,
3114 request: &AwsRequest,
3115 ) -> Result<AwsResponse, AwsServiceError> {
3116 use serde_json::json;
3117 let target = required_query_param(request, "DBClusterIdentifier")?;
3118 let snapshot_id = optional_query_param(request, "SnapshotIdentifier")
3119 .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
3120 .ok_or_else(|| {
3121 AwsServiceError::aws_error(
3126 StatusCode::NOT_FOUND,
3127 "DBClusterSnapshotNotFoundFault",
3128 "SnapshotIdentifier is required",
3129 )
3130 })?;
3131 let arn = format!(
3132 "arn:aws:rds:{}:{}:cluster:{}",
3133 request.region, request.account_id, target
3134 );
3135
3136 let mut accounts = self.state.write();
3137 let state = accounts.get_or_create(&request.account_id);
3138 let snapshot = state
3139 .extras
3140 .get("cluster_snapshots")
3141 .and_then(|m| m.get(&snapshot_id))
3142 .cloned()
3143 .ok_or_else(|| {
3144 AwsServiceError::aws_error(
3145 StatusCode::NOT_FOUND,
3146 "DBClusterSnapshotNotFoundFault",
3147 format!("DBClusterSnapshot {snapshot_id} not found."),
3148 )
3149 })?;
3150 let source_cluster_id = snapshot
3151 .get("DBClusterIdentifier")
3152 .and_then(|v| v.as_str())
3153 .unwrap_or("")
3154 .to_string();
3155 let pending_dump_b64 = snapshot
3156 .get("DumpDataB64")
3157 .and_then(|v| v.as_str())
3158 .map(str::to_string);
3159
3160 let mut entry = state
3161 .extras
3162 .get("clusters")
3163 .and_then(|m| m.get(&source_cluster_id))
3164 .cloned()
3165 .unwrap_or_else(|| {
3166 json!({
3167 "Engine": optional_query_param(request, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
3168 "EngineVersion": optional_query_param(request, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
3169 "MasterUsername": "postgres",
3170 "Port": 5432,
3171 })
3172 });
3173 if let Some(obj) = entry.as_object_mut() {
3174 obj.insert("DBClusterIdentifier".to_string(), json!(target));
3175 obj.insert("DBClusterArn".to_string(), json!(arn));
3176 obj.insert("Status".to_string(), json!("available"));
3177 obj.insert(
3178 "Endpoint".to_string(),
3179 json!(format!(
3180 "{target}.cluster-xxx.{}.rds.amazonaws.com",
3181 request.region
3182 )),
3183 );
3184 obj.insert(
3185 "ReaderEndpoint".to_string(),
3186 json!(format!(
3187 "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
3188 request.region
3189 )),
3190 );
3191 obj.remove("ReplicationSourceIdentifier");
3192 obj.remove("DBClusterMembers");
3193 obj.remove("WriterDBInstanceIdentifier");
3194 obj.remove("DBClusterSnapshotIdentifier");
3195 obj.remove("DBClusterSnapshotArn");
3196 obj.remove("DumpDataB64");
3197 if let Some(engine) = optional_query_param(request, "Engine") {
3198 obj.insert("Engine".to_string(), json!(engine));
3199 }
3200 if let Some(version) = optional_query_param(request, "EngineVersion") {
3201 obj.insert("EngineVersion".to_string(), json!(version));
3202 }
3203 if let Some(port) =
3204 optional_query_param(request, "Port").and_then(|p| p.parse::<i64>().ok())
3205 {
3206 obj.insert("Port".to_string(), json!(port));
3207 }
3208 if let Some(b64) = pending_dump_b64 {
3209 obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
3210 }
3211 }
3212 state
3213 .extras
3214 .entry("clusters".to_string())
3215 .or_default()
3216 .insert(target.clone(), entry);
3217 drop(accounts);
3218
3219 self.emit_event(
3220 RdsSourceType::DbCluster,
3221 &target,
3222 &arn,
3223 "RDS-EVENT-0170",
3224 &["creation"],
3225 "DB cluster restored from snapshot",
3226 );
3227
3228 Ok(AwsResponse::xml(
3229 StatusCode::OK,
3230 query_response_xml(
3231 "RestoreDBClusterFromSnapshot",
3232 RDS_NS,
3233 &crate::extras::db_cluster_xml(&target, &arn),
3234 &request.request_id,
3235 ),
3236 ))
3237 }
3238
3239 async fn restore_db_cluster_to_point_in_time(
3244 &self,
3245 request: &AwsRequest,
3246 ) -> Result<AwsResponse, AwsServiceError> {
3247 use serde_json::json;
3248 let target = required_query_param(request, "DBClusterIdentifier")?;
3249 let source = optional_query_param(request, "SourceDBClusterIdentifier")
3255 .or_else(|| optional_query_param(request, "SourceDbClusterResourceId"))
3256 .ok_or_else(|| {
3257 AwsServiceError::aws_error(
3258 StatusCode::NOT_FOUND,
3259 "DBClusterNotFoundFault",
3260 "Source DB cluster identifier not provided.",
3261 )
3262 })?;
3263 let arn = format!(
3264 "arn:aws:rds:{}:{}:cluster:{}",
3265 request.region, request.account_id, target
3266 );
3267
3268 let writer_info = {
3269 let accounts = self.state.read();
3270 accounts.get(&request.account_id).and_then(|state| {
3271 let cluster_entry = state.extras.get("clusters")?.get(&source)?;
3272 let writer_id = cluster_entry
3273 .get("WriterDBInstanceIdentifier")
3274 .and_then(|v| v.as_str())
3275 .map(str::to_string)
3276 .or_else(|| {
3277 cluster_entry
3278 .get("DBClusterMembers")
3279 .and_then(|m| m.as_array())
3280 .and_then(|arr| {
3281 arr.iter()
3282 .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
3283 .or_else(|| arr.first())
3284 .and_then(|m| m["DBInstanceIdentifier"].as_str())
3285 .map(str::to_string)
3286 })
3287 })?;
3288 let inst = state.instances.get(&writer_id)?;
3289 Some((
3290 inst.db_instance_identifier.clone(),
3291 inst.engine.clone(),
3292 inst.master_username.clone(),
3293 inst.master_user_password.clone(),
3294 inst.db_name
3295 .clone()
3296 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
3297 ))
3298 })
3299 };
3300
3301 let pending_dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
3302 if let Some(runtime) = self.runtime_ref() {
3303 match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
3304 Ok(data) => {
3305 use base64::Engine;
3306 Some(base64::engine::general_purpose::STANDARD.encode(&data))
3307 }
3308 Err(e) => {
3309 tracing::warn!(
3310 error = %e,
3311 cluster = %source,
3312 writer = %wid,
3313 "cluster PIT dump failed; falling back to metadata-only restore"
3314 );
3315 None
3316 }
3317 }
3318 } else {
3319 None
3320 }
3321 } else {
3322 None
3323 };
3324
3325 let mut accounts = self.state.write();
3326 let state = accounts.get_or_create(&request.account_id);
3327 let mut entry = state
3328 .extras
3329 .get("clusters")
3330 .and_then(|m| m.get(&source))
3331 .cloned()
3332 .ok_or_else(|| {
3333 AwsServiceError::aws_error(
3334 StatusCode::NOT_FOUND,
3335 "DBClusterNotFoundFault",
3336 format!("DBCluster {source} not found."),
3337 )
3338 })?;
3339 if let Some(obj) = entry.as_object_mut() {
3340 obj.insert("DBClusterIdentifier".to_string(), json!(target));
3341 obj.insert("DBClusterArn".to_string(), json!(arn));
3342 obj.insert("Status".to_string(), json!("available"));
3343 obj.insert(
3344 "Endpoint".to_string(),
3345 json!(format!(
3346 "{target}.cluster-xxx.{}.rds.amazonaws.com",
3347 request.region
3348 )),
3349 );
3350 obj.insert(
3351 "ReaderEndpoint".to_string(),
3352 json!(format!(
3353 "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
3354 request.region
3355 )),
3356 );
3357 obj.remove("DBClusterMembers");
3358 obj.remove("WriterDBInstanceIdentifier");
3359 if let Some(restore_time) = optional_query_param(request, "RestoreToTime") {
3360 obj.insert("RestoreToTime".to_string(), json!(restore_time));
3361 }
3362 if let Some(latest) = optional_query_param(request, "UseLatestRestorableTime") {
3363 obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
3364 }
3365 if let Some(b64) = pending_dump_b64 {
3366 obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
3367 }
3368 }
3369 state
3370 .extras
3371 .entry("clusters".to_string())
3372 .or_default()
3373 .insert(target.clone(), entry);
3374 drop(accounts);
3375
3376 self.emit_event(
3377 RdsSourceType::DbCluster,
3378 &target,
3379 &arn,
3380 "RDS-EVENT-0171",
3381 &["creation"],
3382 "DB cluster restored to point in time",
3383 );
3384
3385 Ok(AwsResponse::xml(
3386 StatusCode::OK,
3387 query_response_xml(
3388 "RestoreDBClusterToPointInTime",
3389 RDS_NS,
3390 &crate::extras::db_cluster_xml(&target, &arn),
3391 &request.request_id,
3392 ),
3393 ))
3394 }
3395
3396 async fn describe_db_log_files(
3397 &self,
3398 request: &AwsRequest,
3399 ) -> Result<AwsResponse, AwsServiceError> {
3400 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3401 let filename_contains = optional_query_param(request, "FilenameContains");
3402 let file_last_written =
3403 optional_query_param(request, "FileLastWritten").and_then(|s| s.parse::<i64>().ok());
3404 let file_size =
3405 optional_query_param(request, "FileSize").and_then(|s| s.parse::<i64>().ok());
3406
3407 let engine = {
3408 let accounts = self.state.read();
3409 let state = accounts
3410 .get(&request.account_id)
3411 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3412 let instance = state
3413 .instances
3414 .get(&db_instance_identifier)
3415 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3416 instance.engine.clone()
3417 };
3418
3419 let now_millis = Utc::now().timestamp_millis();
3425 let candidates: Vec<(String, i64, i64)> = match engine.as_str() {
3426 "mysql" | "mariadb" => vec![
3427 ("error/mysql-error.log".to_string(), now_millis, 1024),
3428 ("slowquery/mysql-slowquery.log".to_string(), now_millis, 512),
3429 ],
3430 _ => vec![
3431 ("error/postgres.log".to_string(), now_millis, 1024),
3432 ("trace/postgres-trace.log".to_string(), now_millis, 512),
3433 ],
3434 };
3435
3436 let filtered: Vec<(String, i64, i64)> = candidates
3437 .into_iter()
3438 .filter(|(name, written, size)| {
3439 if let Some(needle) = &filename_contains {
3440 if !name.contains(needle) {
3441 return false;
3442 }
3443 }
3444 if let Some(min_written) = file_last_written {
3445 if *written / 1000 <= min_written {
3449 return false;
3450 }
3451 }
3452 if let Some(min_size) = file_size {
3453 if *size < min_size {
3454 return false;
3455 }
3456 }
3457 true
3458 })
3459 .collect();
3460
3461 let details: String = filtered
3462 .iter()
3463 .map(|(name, written, size)| {
3464 format!(
3465 "<DescribeDBLogFilesDetails>\
3466 <LogFileName>{}</LogFileName>\
3467 <LastWritten>{}</LastWritten>\
3468 <Size>{}</Size>\
3469 </DescribeDBLogFilesDetails>",
3470 xml_escape(name),
3471 written,
3472 size,
3473 )
3474 })
3475 .collect();
3476
3477 Ok(AwsResponse::xml(
3478 StatusCode::OK,
3479 query_response_xml(
3480 "DescribeDBLogFiles",
3481 RDS_NS,
3482 &format!("<DescribeDBLogFiles>{details}</DescribeDBLogFiles>"),
3483 &request.request_id,
3484 ),
3485 ))
3486 }
3487
3488 async fn download_db_log_file_portion(
3489 &self,
3490 request: &AwsRequest,
3491 ) -> Result<AwsResponse, AwsServiceError> {
3492 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3493 let log_file_name = required_query_param(request, "LogFileName")?;
3494 let _marker = optional_query_param(request, "Marker").unwrap_or_else(|| "0".to_string());
3495 let _number_of_lines = optional_query_param(request, "NumberOfLines")
3496 .and_then(|s| s.parse::<i64>().ok())
3497 .unwrap_or(0);
3498
3499 let engine = {
3500 let accounts = self.state.read();
3501 let state = accounts
3502 .get(&request.account_id)
3503 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3504 let instance = state
3505 .instances
3506 .get(&db_instance_identifier)
3507 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3508 instance.engine.clone()
3509 };
3510
3511 let known_synthetic = matches!(
3512 (engine.as_str(), log_file_name.as_str()),
3513 ("mysql" | "mariadb", "error/mysql-error.log")
3514 | ("mysql" | "mariadb", "slowquery/mysql-slowquery.log")
3515 | (_, "error/postgres.log")
3516 | (_, "trace/postgres-trace.log")
3517 );
3518
3519 let container_path = map_log_file_to_container_path(&engine, &log_file_name);
3520
3521 let log_data = if let Some(runtime) = self.runtime.as_ref() {
3522 match runtime
3523 .read_log_file(&db_instance_identifier, &container_path)
3524 .await
3525 {
3526 Ok(bytes) => Some(bytes),
3527 Err(RuntimeError::Unavailable) => None,
3528 Err(RuntimeError::ContainerStartFailed(_)) if known_synthetic => Some(Vec::new()),
3529 Err(RuntimeError::ContainerStartFailed(message)) => {
3530 return Err(AwsServiceError::aws_error(
3531 StatusCode::NOT_FOUND,
3532 "DBLogFileNotFoundFault",
3533 format!("DBLogFile {log_file_name} not found: {message}"),
3534 ));
3535 }
3536 }
3537 } else if known_synthetic {
3538 Some(Vec::new())
3539 } else {
3540 None
3541 };
3542
3543 let log_data = match log_data {
3544 Some(bytes) => bytes,
3545 None => {
3546 return Err(AwsServiceError::aws_error(
3547 StatusCode::NOT_FOUND,
3548 "DBLogFileNotFoundFault",
3549 format!("DBLogFile {log_file_name} not found"),
3550 ))
3551 }
3552 };
3553
3554 let payload = String::from_utf8_lossy(&log_data).into_owned();
3555 let total_bytes = payload.len();
3556
3557 Ok(AwsResponse::xml(
3558 StatusCode::OK,
3559 query_response_xml(
3560 "DownloadDBLogFilePortion",
3561 RDS_NS,
3562 &format!(
3563 "<LogFileData>{}</LogFileData>\
3564 <Marker>{}</Marker>\
3565 <AdditionalDataPending>false</AdditionalDataPending>",
3566 xml_escape(&payload),
3567 total_bytes,
3568 ),
3569 &request.request_id,
3570 ),
3571 ))
3572 }
3573
3574 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3575 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3576 let db_subnet_group_description =
3577 required_query_param(request, "DBSubnetGroupDescription")?;
3578 let subnet_ids = parse_subnet_ids(request)?;
3579
3580 if subnet_ids.len() < 2 {
3581 return Err(AwsServiceError::aws_error(
3582 StatusCode::BAD_REQUEST,
3583 "DBSubnetGroupDoesNotCoverEnoughAZs",
3584 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3585 ));
3586 }
3587
3588 let mut accounts = self.state.write();
3589 let state = accounts.get_or_create(&request.account_id);
3590
3591 if state.subnet_groups.contains_key(&db_subnet_group_name) {
3592 return Err(AwsServiceError::aws_error(
3593 StatusCode::CONFLICT,
3594 "DBSubnetGroupAlreadyExists",
3595 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
3596 ));
3597 }
3598
3599 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
3600 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3601 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
3602 .collect();
3603
3604 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3606 if unique_azs.len() < 2 {
3607 return Err(AwsServiceError::aws_error(
3608 StatusCode::BAD_REQUEST,
3609 "DBSubnetGroupDoesNotCoverEnoughAZs",
3610 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3611 ));
3612 }
3613
3614 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
3615 let tags = parse_tags(request)?;
3616
3617 let subnet_group = DbSubnetGroup {
3618 db_subnet_group_name: db_subnet_group_name.clone(),
3619 db_subnet_group_arn,
3620 db_subnet_group_description,
3621 vpc_id,
3622 subnet_ids,
3623 subnet_availability_zones,
3624 tags,
3625 };
3626
3627 state
3628 .subnet_groups
3629 .insert(db_subnet_group_name, subnet_group.clone());
3630
3631 Ok(AwsResponse::xml(
3632 StatusCode::OK,
3633 query_response_xml(
3634 "CreateDBSubnetGroup",
3635 RDS_NS,
3636 &format!(
3637 "<DBSubnetGroup>{}</DBSubnetGroup>",
3638 db_subnet_group_xml(&subnet_group)
3639 ),
3640 &request.request_id,
3641 ),
3642 ))
3643 }
3644
3645 fn describe_db_subnet_groups(
3646 &self,
3647 request: &AwsRequest,
3648 ) -> Result<AwsResponse, AwsServiceError> {
3649 let db_subnet_group_name = optional_query_param(request, "DBSubnetGroupName");
3650 let marker = optional_query_param(request, "Marker");
3651 let max_records = optional_query_param(request, "MaxRecords");
3652
3653 let accounts = self.state.read();
3654 let empty = RdsState::new(&request.account_id, &request.region);
3655 let state = accounts.get(&request.account_id).unwrap_or(&empty);
3656
3657 if let Some(name) = db_subnet_group_name {
3659 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
3660 AwsServiceError::aws_error(
3661 StatusCode::NOT_FOUND,
3662 "DBSubnetGroupNotFoundFault",
3663 format!("DBSubnetGroup {} not found.", name),
3664 )
3665 })?;
3666
3667 return Ok(AwsResponse::xml(
3668 StatusCode::OK,
3669 query_response_xml(
3670 "DescribeDBSubnetGroups",
3671 RDS_NS,
3672 &format!(
3673 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
3674 db_subnet_group_xml(sg)
3675 ),
3676 &request.request_id,
3677 ),
3678 ));
3679 }
3680
3681 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
3683 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
3684
3685 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
3687 &sg.db_subnet_group_name
3688 })?;
3689
3690 let marker_xml = paginated
3691 .next_marker
3692 .as_ref()
3693 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3694 .unwrap_or_default();
3695
3696 let body = paginated
3697 .items
3698 .iter()
3699 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
3700 .collect::<Vec<_>>()
3701 .join("");
3702
3703 Ok(AwsResponse::xml(
3704 StatusCode::OK,
3705 query_response_xml(
3706 "DescribeDBSubnetGroups",
3707 RDS_NS,
3708 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
3709 &request.request_id,
3710 ),
3711 ))
3712 }
3713
3714 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3715 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3716
3717 let mut accounts = self.state.write();
3718 let state = accounts.get_or_create(&request.account_id);
3719
3720 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
3721 return Err(AwsServiceError::aws_error(
3722 StatusCode::NOT_FOUND,
3723 "DBSubnetGroupNotFoundFault",
3724 format!("DBSubnetGroup {db_subnet_group_name} not found."),
3725 ));
3726 }
3727
3728 Ok(AwsResponse::xml(
3729 StatusCode::OK,
3730 query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
3731 ))
3732 }
3733
3734 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3735 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3736 let subnet_ids = parse_subnet_ids(request)?;
3737
3738 if subnet_ids.len() < 2 {
3739 return Err(AwsServiceError::aws_error(
3740 StatusCode::BAD_REQUEST,
3741 "DBSubnetGroupDoesNotCoverEnoughAZs",
3742 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3743 ));
3744 }
3745
3746 let mut accounts = self.state.write();
3747 let state = accounts.get_or_create(&request.account_id);
3748
3749 let region = state.region.clone();
3750
3751 let subnet_group = state
3752 .subnet_groups
3753 .get_mut(&db_subnet_group_name)
3754 .ok_or_else(|| {
3755 AwsServiceError::aws_error(
3756 StatusCode::NOT_FOUND,
3757 "DBSubnetGroupNotFoundFault",
3758 format!("DBSubnetGroup {db_subnet_group_name} not found."),
3759 )
3760 })?;
3761
3762 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3763 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
3764 .collect();
3765
3766 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3768 if unique_azs.len() < 2 {
3769 return Err(AwsServiceError::aws_error(
3770 StatusCode::BAD_REQUEST,
3771 "DBSubnetGroupDoesNotCoverEnoughAZs",
3772 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3773 ));
3774 }
3775
3776 subnet_group.subnet_ids = subnet_ids;
3777 subnet_group.subnet_availability_zones = subnet_availability_zones;
3778
3779 let subnet_group_clone = subnet_group.clone();
3780
3781 Ok(AwsResponse::xml(
3782 StatusCode::OK,
3783 query_response_xml(
3784 "ModifyDBSubnetGroup",
3785 RDS_NS,
3786 &format!(
3787 "<DBSubnetGroup>{}</DBSubnetGroup>",
3788 db_subnet_group_xml(&subnet_group_clone)
3789 ),
3790 &request.request_id,
3791 ),
3792 ))
3793 }
3794
3795 fn create_db_parameter_group(
3796 &self,
3797 request: &AwsRequest,
3798 ) -> Result<AwsResponse, AwsServiceError> {
3799 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3800 let db_parameter_group_family = required_query_param(request, "DBParameterGroupFamily")?;
3801 let description = required_query_param(request, "Description")?;
3802
3803 let mut accounts = self.state.write();
3811 let state = accounts.get_or_create(&request.account_id);
3812
3813 if state
3814 .parameter_groups
3815 .contains_key(&db_parameter_group_name)
3816 {
3817 return Err(AwsServiceError::aws_error(
3818 StatusCode::CONFLICT,
3819 "DBParameterGroupAlreadyExists",
3820 format!("DBParameterGroup {db_parameter_group_name} already exists."),
3821 ));
3822 }
3823
3824 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
3825 let tags = parse_tags(request)?;
3826
3827 let parameter_group = DbParameterGroup {
3828 db_parameter_group_name: db_parameter_group_name.clone(),
3829 db_parameter_group_arn,
3830 db_parameter_group_family,
3831 description,
3832 parameters: std::collections::BTreeMap::new(),
3833 tags,
3834 };
3835
3836 state
3837 .parameter_groups
3838 .insert(db_parameter_group_name.clone(), parameter_group.clone());
3839 let arn = parameter_group.db_parameter_group_arn.clone();
3840 drop(accounts);
3841
3842 self.emit_event(
3843 RdsSourceType::DbParameterGroup,
3844 &db_parameter_group_name,
3845 &arn,
3846 "RDS-EVENT-0179",
3847 &["creation"],
3848 "DB parameter group created",
3849 );
3850
3851 Ok(AwsResponse::xml(
3852 StatusCode::OK,
3853 query_response_xml(
3854 "CreateDBParameterGroup",
3855 RDS_NS,
3856 &format!(
3857 "<DBParameterGroup>{}</DBParameterGroup>",
3858 db_parameter_group_xml(¶meter_group)
3859 ),
3860 &request.request_id,
3861 ),
3862 ))
3863 }
3864
3865 fn describe_db_parameter_groups(
3866 &self,
3867 request: &AwsRequest,
3868 ) -> Result<AwsResponse, AwsServiceError> {
3869 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
3870 let marker = optional_query_param(request, "Marker");
3871 let max_records = optional_query_param(request, "MaxRecords");
3872
3873 let accounts = self.state.read();
3874 let empty = RdsState::new(&request.account_id, &request.region);
3875 let state = accounts.get(&request.account_id).unwrap_or(&empty);
3876
3877 if let Some(name) = db_parameter_group_name {
3879 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
3880 AwsServiceError::aws_error(
3881 StatusCode::NOT_FOUND,
3882 "DBParameterGroupNotFound",
3883 format!("DBParameterGroup {} not found.", name),
3884 )
3885 })?;
3886
3887 return Ok(AwsResponse::xml(
3888 StatusCode::OK,
3889 query_response_xml(
3890 "DescribeDBParameterGroups", RDS_NS,
3891 &format!(
3892 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
3893 db_parameter_group_xml(pg)
3894 ),
3895 &request.request_id,
3896 ),
3897 ));
3898 }
3899
3900 let mut parameter_groups: Vec<DbParameterGroup> =
3902 state.parameter_groups.values().cloned().collect();
3903 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
3904
3905 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
3907 &pg.db_parameter_group_name
3908 })?;
3909
3910 let marker_xml = paginated
3911 .next_marker
3912 .as_ref()
3913 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3914 .unwrap_or_default();
3915
3916 let body = paginated
3917 .items
3918 .iter()
3919 .map(|pg| {
3920 format!(
3921 "<DBParameterGroup>{}</DBParameterGroup>",
3922 db_parameter_group_xml(pg)
3923 )
3924 })
3925 .collect::<Vec<_>>()
3926 .join("");
3927
3928 Ok(AwsResponse::xml(
3929 StatusCode::OK,
3930 query_response_xml(
3931 "DescribeDBParameterGroups",
3932 RDS_NS,
3933 &format!(
3934 "<DBParameterGroups>{}</DBParameterGroups>{}",
3935 body, marker_xml
3936 ),
3937 &request.request_id,
3938 ),
3939 ))
3940 }
3941
3942 fn delete_db_parameter_group(
3943 &self,
3944 request: &AwsRequest,
3945 ) -> Result<AwsResponse, AwsServiceError> {
3946 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3947
3948 let arn = {
3949 let mut accounts = self.state.write();
3950 let state = accounts.get_or_create(&request.account_id);
3951
3952 if db_parameter_group_name.starts_with("default.") {
3953 return Err(AwsServiceError::aws_error(
3954 StatusCode::BAD_REQUEST,
3955 "InvalidDBParameterGroupState",
3956 "Cannot delete default parameter groups.",
3957 ));
3958 }
3959
3960 let removed = state
3961 .parameter_groups
3962 .remove(&db_parameter_group_name)
3963 .ok_or_else(|| {
3964 AwsServiceError::aws_error(
3965 StatusCode::NOT_FOUND,
3966 "DBParameterGroupNotFound",
3967 format!("DBParameterGroup {db_parameter_group_name} not found."),
3968 )
3969 })?;
3970 removed.db_parameter_group_arn
3971 };
3972
3973 self.emit_event(
3974 RdsSourceType::DbParameterGroup,
3975 &db_parameter_group_name,
3976 &arn,
3977 "RDS-EVENT-0064",
3978 &["deletion"],
3979 "DB parameter group deleted",
3980 );
3981
3982 Ok(AwsResponse::xml(
3983 StatusCode::OK,
3984 query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
3985 ))
3986 }
3987
3988 fn modify_db_parameter_group(
3989 &self,
3990 request: &AwsRequest,
3991 ) -> Result<AwsResponse, AwsServiceError> {
3992 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3993
3994 let parsed_params = parse_db_parameter_members(request);
3999
4000 let mut accounts = self.state.write();
4001 let state = accounts.get_or_create(&request.account_id);
4002
4003 let parameter_group = state
4004 .parameter_groups
4005 .get_mut(&db_parameter_group_name)
4006 .ok_or_else(|| {
4007 AwsServiceError::aws_error(
4008 StatusCode::NOT_FOUND,
4009 "DBParameterGroupNotFound",
4010 format!("DBParameterGroup {db_parameter_group_name} not found."),
4011 )
4012 })?;
4013
4014 if let Some(new_description) = optional_query_param(request, "Description") {
4015 parameter_group.description = new_description;
4016 }
4017
4018 for (name, value) in parsed_params {
4019 parameter_group.parameters.insert(name, value);
4020 }
4021
4022 let parameter_group_clone = parameter_group.clone();
4023 let arn = parameter_group_clone.db_parameter_group_arn.clone();
4024 drop(accounts);
4025
4026 self.emit_event(
4027 RdsSourceType::DbParameterGroup,
4028 &db_parameter_group_name,
4029 &arn,
4030 "RDS-EVENT-0037",
4031 &["configuration change"],
4032 "DB parameter group modified",
4033 );
4034
4035 Ok(AwsResponse::xml(
4036 StatusCode::OK,
4037 query_response_xml(
4038 "ModifyDBParameterGroup",
4039 RDS_NS,
4040 &format!(
4041 "<DBParameterGroupName>{}</DBParameterGroupName>",
4042 xml_escape(¶meter_group_clone.db_parameter_group_name)
4043 ),
4044 &request.request_id,
4045 ),
4046 ))
4047 }
4048
4049 fn describe_db_parameters_real(
4050 &self,
4051 request: &AwsRequest,
4052 ) -> Result<AwsResponse, AwsServiceError> {
4053 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
4054 let source_filter = optional_query_param(request, "Source");
4055
4056 let accounts = self.state.read();
4057 let state = match accounts.get(&request.account_id) {
4058 Some(s) => s,
4059 None => {
4060 return Err(AwsServiceError::aws_error(
4061 StatusCode::NOT_FOUND,
4062 "DBParameterGroupNotFound",
4063 format!("DBParameterGroup {db_parameter_group_name} not found."),
4064 ));
4065 }
4066 };
4067 let parameter_group = state
4068 .parameter_groups
4069 .get(&db_parameter_group_name)
4070 .ok_or_else(|| {
4071 AwsServiceError::aws_error(
4072 StatusCode::NOT_FOUND,
4073 "DBParameterGroupNotFound",
4074 format!("DBParameterGroup {db_parameter_group_name} not found."),
4075 )
4076 })?;
4077
4078 let source = source_filter.as_deref();
4086 let include_user = source.is_none_or(|s| s == "user");
4087 let include_engine_default = source.is_none_or(|s| s == "engine-default");
4088 let mut members_xml = String::new();
4089 if include_user {
4090 for (name, value) in ¶meter_group.parameters {
4091 members_xml.push_str(&render_user_parameter_xml(name, value));
4092 }
4093 }
4094 if include_engine_default {
4095 for default in
4100 crate::state::engine_default_parameters(¶meter_group.db_parameter_group_family)
4101 {
4102 if parameter_group.parameters.contains_key(default.name) {
4103 continue;
4104 }
4105 members_xml.push_str(&render_engine_default_parameter_xml(default));
4106 }
4107 }
4108 let body = format!(" <Parameters>\n{members_xml} </Parameters>");
4109 Ok(AwsResponse::xml(
4110 StatusCode::OK,
4111 query_response_xml("DescribeDBParameters", RDS_NS, &body, &request.request_id),
4112 ))
4113 }
4114}
4115
4116pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
4120 format!(
4121 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
4122 xml_escape(name),
4123 xml_escape(value),
4124 )
4125}
4126
4127pub(crate) fn render_engine_default_parameter_xml(
4131 default: &crate::state::EngineDefaultParameter,
4132) -> String {
4133 format!(
4134 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
4135 xml_escape(default.name),
4136 xml_escape(default.value),
4137 xml_escape(default.apply_type),
4138 xml_escape(default.data_type),
4139 xml_escape(default.allowed_values),
4140 default.is_modifiable,
4141 )
4142}
4143
4144pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
4152 let mut out = Vec::new();
4153 for prefix in ["Parameters.Parameter", "Parameters.member"] {
4154 let mut index = 1;
4155 loop {
4156 let name_key = format!("{prefix}.{index}.ParameterName");
4157 let value_key = format!("{prefix}.{index}.ParameterValue");
4158 let name = optional_query_param(request, &name_key);
4159 let value = optional_query_param(request, &value_key);
4160 if name.is_none() && value.is_none() {
4161 break;
4162 }
4163 if let (Some(n), Some(v)) = (name, value) {
4164 if !n.is_empty() {
4165 out.push((n, v));
4166 }
4167 }
4168 index += 1;
4169 }
4170 }
4171 out
4172}
4173
4174fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
4178 match (engine, log_file_name) {
4179 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
4180 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
4181 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
4182 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
4183 "/var/log/mysql/slow.log".to_string()
4184 }
4185 _ => log_file_name.to_string(),
4186 }
4187}
4188
4189pub(crate) struct PaginationResult<T> {
4190 items: Vec<T>,
4191 next_marker: Option<String>,
4192}
4193
4194fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
4198 use serde_json::{json, Value};
4199 let Some(map) = state.extras.get_mut("clusters") else {
4200 return;
4201 };
4202 let Some(entry) = map.get_mut(cluster_id) else {
4203 return;
4204 };
4205 let Some(obj) = entry.as_object_mut() else {
4206 return;
4207 };
4208 let mut members: Vec<Value> = obj
4209 .get("DBClusterMembers")
4210 .and_then(|v| v.as_array())
4211 .cloned()
4212 .unwrap_or_default();
4213 if members
4214 .iter()
4215 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
4216 {
4217 return;
4218 }
4219 let has_writer = members
4220 .iter()
4221 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
4222 let promotion_tier = (members.len() as i64) + 1;
4223 members.push(json!({
4224 "DBInstanceIdentifier": instance_id,
4225 "IsClusterWriter": !has_writer,
4226 "DBClusterParameterGroupStatus": "in-sync",
4227 "PromotionTier": promotion_tier,
4228 }));
4229 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
4230 if !has_writer {
4231 obj.insert(
4232 "WriterDBInstanceIdentifier".to_string(),
4233 Value::String(instance_id.to_string()),
4234 );
4235 }
4236}
4237
4238#[path = "service_helpers.rs"]
4239mod service_helpers;
4240pub(crate) use service_helpers::*;
4241
4242#[cfg(test)]
4243#[path = "service_tests.rs"]
4244mod tests;