1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeSourceRegions",
125 "DescribeTenantDatabases",
126 "DescribeValidDBInstanceModifications",
127 "DisableHttpEndpoint",
128 "DownloadDBLogFilePortion",
129 "EnableHttpEndpoint",
130 "FailoverDBCluster",
131 "FailoverGlobalCluster",
132 "ListTagsForResource",
133 "ModifyActivityStream",
134 "ModifyCertificates",
135 "ModifyCurrentDBClusterCapacity",
136 "ModifyCustomDBEngineVersion",
137 "ModifyDBCluster",
138 "ModifyDBClusterEndpoint",
139 "ModifyDBClusterParameterGroup",
140 "ModifyDBClusterSnapshotAttribute",
141 "ModifyDBInstance",
142 "ModifyDBParameterGroup",
143 "ModifyDBProxy",
144 "ModifyDBProxyEndpoint",
145 "ModifyDBProxyTargetGroup",
146 "ModifyDBRecommendation",
147 "ModifyDBShardGroup",
148 "ModifyDBSnapshot",
149 "ModifyDBSnapshotAttribute",
150 "ModifyDBSubnetGroup",
151 "ModifyEventSubscription",
152 "ModifyGlobalCluster",
153 "ModifyIntegration",
154 "ModifyOptionGroup",
155 "ModifyTenantDatabase",
156 "PromoteReadReplica",
157 "PromoteReadReplicaDBCluster",
158 "PurchaseReservedDBInstancesOffering",
159 "RebootDBCluster",
160 "RebootDBInstance",
161 "RebootDBShardGroup",
162 "RegisterDBProxyTargets",
163 "RemoveFromGlobalCluster",
164 "RemoveRoleFromDBCluster",
165 "RemoveRoleFromDBInstance",
166 "RemoveSourceIdentifierFromSubscription",
167 "RemoveTagsFromResource",
168 "ResetDBClusterParameterGroup",
169 "ResetDBParameterGroup",
170 "RestoreDBClusterFromS3",
171 "RestoreDBClusterFromSnapshot",
172 "RestoreDBClusterToPointInTime",
173 "RestoreDBInstanceFromDBSnapshot",
174 "RestoreDBInstanceFromS3",
175 "RestoreDBInstanceToPointInTime",
176 "RevokeDBSecurityGroupIngress",
177 "StartActivityStream",
178 "StartDBCluster",
179 "StartDBInstance",
180 "StartDBInstanceAutomatedBackupsReplication",
181 "StartExportTask",
182 "StopActivityStream",
183 "StopDBCluster",
184 "StopDBInstance",
185 "StopDBInstanceAutomatedBackupsReplication",
186 "SwitchoverBlueGreenDeployment",
187 "SwitchoverGlobalCluster",
188 "SwitchoverReadReplica",
189];
190
191pub struct RdsService {
192 pub(crate) state: SharedRdsState,
193 runtime: Option<Arc<RdsRuntime>>,
194 snapshot_store: Option<Arc<dyn SnapshotStore>>,
195 snapshot_lock: Arc<AsyncMutex<()>>,
196 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
197}
198
199#[derive(Clone, Copy)]
201#[allow(dead_code, clippy::enum_variant_names)]
202pub(crate) enum RdsSourceType {
203 DbInstance,
204 DbSnapshot,
205 DbParameterGroup,
206 DbCluster,
207 DbClusterSnapshot,
208}
209
210impl RdsSourceType {
211 fn as_str(self) -> &'static str {
214 match self {
215 Self::DbInstance => "DB_INSTANCE",
216 Self::DbSnapshot => "DB_SNAPSHOT",
217 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
218 Self::DbCluster => "DB_CLUSTER",
219 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
220 }
221 }
222
223 pub(crate) fn describe_events_str(self) -> &'static str {
228 match self {
229 Self::DbInstance => "db-instance",
230 Self::DbSnapshot => "db-snapshot",
231 Self::DbParameterGroup => "db-parameter-group",
232 Self::DbCluster => "db-cluster",
233 Self::DbClusterSnapshot => "db-cluster-snapshot",
234 }
235 }
236
237 fn detail_type(self) -> &'static str {
238 match self {
239 Self::DbInstance => "RDS DB Instance Event",
240 Self::DbSnapshot => "RDS DB Snapshot Event",
241 Self::DbParameterGroup => "RDS DB Parameter Group Event",
242 Self::DbCluster => "RDS DB Cluster Event",
243 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
244 }
245 }
246}
247
248impl RdsService {
249 pub(crate) fn state_handle(&self) -> &SharedRdsState {
250 &self.state
251 }
252}
253
254impl RdsService {
255 pub fn new(state: SharedRdsState) -> Self {
256 Self {
257 state,
258 runtime: None,
259 snapshot_store: None,
260 snapshot_lock: Arc::new(AsyncMutex::new(())),
261 delivery_bus: None,
262 }
263 }
264
265 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
266 self.runtime = Some(runtime);
267 self
268 }
269
270 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
274 self.runtime.as_ref()
275 }
276
277 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
278 self.snapshot_store = Some(store);
279 self
280 }
281
282 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
283 self.delivery_bus = Some(bus);
284 self
285 }
286
287 pub(crate) fn emit_event(
292 &self,
293 source_type: RdsSourceType,
294 source_identifier: &str,
295 source_arn: &str,
296 event_id: &str,
297 event_categories: &[&str],
298 message: &str,
299 ) {
300 let account_id = source_arn.split(':').nth(4).unwrap_or("");
303 emit_event_static_with_state(
304 self.delivery_bus.as_ref(),
305 Some(&self.state),
306 if account_id.is_empty() {
307 None
308 } else {
309 Some(account_id)
310 },
311 source_type,
312 source_identifier,
313 source_arn,
314 event_id,
315 event_categories,
316 message,
317 );
318 }
319
320 async fn save_snapshot(&self) {
321 save_snapshot_static(
322 self.state.clone(),
323 self.snapshot_store.clone(),
324 self.snapshot_lock.clone(),
325 )
326 .await;
327 }
328}
329
330async fn save_snapshot_static(
335 state: SharedRdsState,
336 store: Option<Arc<dyn SnapshotStore>>,
337 lock: Arc<AsyncMutex<()>>,
338) {
339 let Some(store) = store else {
340 return;
341 };
342 let _guard = lock.lock().await;
343 let snapshot = RdsSnapshot {
344 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
345 state: None,
346 accounts: Some(state.read().clone()),
347 };
348 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
349 let bytes = serde_json::to_vec(&snapshot)
350 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
351 store.save(&bytes)
352 })
353 .await;
354 match join {
355 Ok(Ok(())) => {}
356 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
357 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
358 }
359}
360
361impl RdsService {
362 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
368 self.runtime.as_ref().ok_or_else(|| {
369 AwsServiceError::aws_error(
370 StatusCode::SERVICE_UNAVAILABLE,
371 "InvalidParameterValue",
372 "Docker/Podman is required for RDS DB instances but is not available",
373 )
374 })
375 }
376}
377
378#[async_trait]
379impl AwsService for RdsService {
380 fn service_name(&self) -> &str {
381 "rds"
382 }
383
384 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385 let mutates = is_mutating_action(request.action.as_str());
386 let result = match request.action.as_str() {
387 "AddTagsToResource" => self.add_tags_to_resource(&request),
388 "CreateDBInstance" => self.create_db_instance(&request).await,
389 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
390 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
391 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
392 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
393 "DeleteDBInstance" => self.delete_db_instance(&request).await,
394 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
395 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
396 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
397 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
398 "DescribeDBInstances" => self.describe_db_instances(&request),
399 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
400 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
401 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
402 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
403 "DescribeOrderableDBInstanceOptions" => {
404 self.describe_orderable_db_instance_options(&request)
405 }
406 "ListTagsForResource" => self.list_tags_for_resource(&request),
407 "ModifyDBInstance" => self.modify_db_instance(&request),
408 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
409 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
410 "RebootDBInstance" => self.reboot_db_instance(&request).await,
411 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
412 "RestoreDBInstanceFromDBSnapshot" => {
413 self.restore_db_instance_from_db_snapshot(&request).await
414 }
415 "RestoreDBInstanceToPointInTime" => {
416 self.restore_db_instance_to_point_in_time(&request).await
417 }
418 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
419 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
420 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
421 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
422 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
423 "RestoreDBClusterToPointInTime" => {
424 self.restore_db_cluster_to_point_in_time(&request).await
425 }
426 _ => self.handle_extra_action(&request),
427 };
428 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
429 self.save_snapshot().await;
430 }
431 result
432 }
433
434 fn supported_actions(&self) -> &[&str] {
435 SUPPORTED_ACTIONS
436 }
437}
438
439impl RdsService {
440 async fn create_db_instance(
441 &self,
442 request: &AwsRequest,
443 ) -> Result<AwsResponse, AwsServiceError> {
444 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
445 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
446 let db_instance_class = required_query_param(request, "DBInstanceClass")?;
447 let engine = required_query_param(request, "Engine")?;
448 let master_username = required_query_param(request, "MasterUsername")?;
449 let master_user_password = required_query_param(request, "MasterUserPassword")?;
450 let db_name = optional_query_param(request, "DBName");
451 let engine_version =
452 optional_query_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
453 let publicly_accessible =
454 parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?
455 .unwrap_or(true);
456 let deletion_protection =
457 parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?
458 .unwrap_or(false);
459 let port = optional_i32_param(request, "Port")?
460 .unwrap_or_else(|| default_port_for_engine(&engine));
461 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
462
463 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName")
464 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
465
466 let backup_retention_period =
467 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
468 let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow")
469 .unwrap_or_else(|| "03:00-04:00".to_string());
470 let option_group_name = optional_query_param(request, "OptionGroupName");
471 let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?
472 .unwrap_or(false);
473 let availability_zone = optional_query_param(request, "AvailabilityZone");
474 let storage_type = optional_query_param(request, "StorageType");
475 let storage_encrypted =
476 parse_optional_bool(optional_query_param(request, "StorageEncrypted").as_deref())?
477 .unwrap_or(false);
478 let kms_key_id = optional_query_param(request, "KmsKeyId");
479 let iam_database_authentication_enabled = parse_optional_bool(
480 optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
481 )?
482 .unwrap_or(false);
483 let iops = optional_i32_param(request, "Iops")?;
484 let monitoring_interval = optional_i32_param(request, "MonitoringInterval")?;
485 let monitoring_role_arn = optional_query_param(request, "MonitoringRoleArn");
486 let performance_insights_enabled = parse_optional_bool(
487 optional_query_param(request, "EnablePerformanceInsights").as_deref(),
488 )?
489 .unwrap_or(false);
490 let performance_insights_kms_key_id =
491 optional_query_param(request, "PerformanceInsightsKMSKeyId");
492 let performance_insights_retention_period =
493 optional_i32_param(request, "PerformanceInsightsRetentionPeriod")?;
494 let enabled_cloudwatch_logs_exports =
495 parse_cloudwatch_logs_exports(request, "EnableCloudwatchLogsExports");
496 let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
497 let network_type = optional_query_param(request, "NetworkType");
498 let character_set_name = optional_query_param(request, "CharacterSetName");
499 let auto_minor_version_upgrade = parse_optional_bool(
500 optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
501 )?;
502 let copy_tags_to_snapshot =
503 parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
504 let db_cluster_identifier = optional_query_param(request, "DBClusterIdentifier");
505
506 validate_create_request(
507 &db_instance_identifier,
508 allocated_storage,
509 &db_instance_class,
510 &engine,
511 &engine_version,
512 port,
513 )?;
514
515 {
516 let mut accounts = self.state.write();
517 let state = accounts.get_or_create(&request.account_id);
518 if !state.begin_instance_creation(&db_instance_identifier) {
519 return Err(AwsServiceError::aws_error(
520 StatusCode::BAD_REQUEST,
521 "DBInstanceAlreadyExists",
522 format!("DBInstance {} already exists.", db_instance_identifier),
523 ));
524 }
525 if let Some(ref pg_name) = db_parameter_group_name {
527 if !state.parameter_groups.contains_key(pg_name) {
528 state.cancel_instance_creation(&db_instance_identifier);
529 return Err(AwsServiceError::aws_error(
530 StatusCode::NOT_FOUND,
531 "DBParameterGroupNotFound",
532 format!("DBParameterGroup {} not found.", pg_name),
533 ));
534 }
535 }
536 }
537
538 let runtime = self.require_runtime()?.clone();
539
540 let logical_db_name = db_name
541 .clone()
542 .unwrap_or_else(|| default_db_name(&engine).to_string());
543
544 let created_at = Utc::now();
550 let instance = {
551 let mut accounts = self.state.write();
552 let state = accounts.get_or_create(&request.account_id);
553 let placeholder = DbInstance {
554 db_instance_identifier: db_instance_identifier.clone(),
555 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
556 db_instance_class: db_instance_class.clone(),
557 engine: engine.clone(),
558 engine_version: engine_version.clone(),
559 db_instance_status: "creating".to_string(),
560 master_username: master_username.clone(),
561 db_name: db_name.clone(),
562 endpoint_address: String::new(),
563 port: 0,
564 allocated_storage,
565 publicly_accessible,
566 deletion_protection,
567 created_at,
568 dbi_resource_id: state.next_dbi_resource_id(),
569 master_user_password: master_user_password.clone(),
570 container_id: String::new(),
571 host_port: 0,
572 tags: Vec::new(),
573 read_replica_source_db_instance_identifier: None,
574 read_replica_db_instance_identifiers: Vec::new(),
575 vpc_security_group_ids,
576 db_parameter_group_name,
577 backup_retention_period,
578 preferred_backup_window,
579 preferred_maintenance_window: None,
580 latest_restorable_time: if backup_retention_period > 0 {
581 Some(created_at)
582 } else {
583 None
584 },
585 option_group_name,
586 multi_az,
587 pending_modified_values: None,
588 availability_zone,
589 storage_type,
590 storage_encrypted,
591 kms_key_id,
592 iam_database_authentication_enabled,
593 iops,
594 monitoring_interval,
595 monitoring_role_arn,
596 performance_insights_enabled,
597 performance_insights_kms_key_id,
598 performance_insights_retention_period,
599 enabled_cloudwatch_logs_exports,
600 ca_certificate_identifier,
601 network_type,
602 character_set_name,
603 auto_minor_version_upgrade,
604 copy_tags_to_snapshot,
605 master_user_secret_arn: None,
606 master_user_secret_kms_key_id: None,
607 license_model: None,
608 max_allocated_storage: None,
609 multi_tenant: None,
610 storage_throughput: None,
611 tde_credential_arn: None,
612 delete_automated_backups: None,
613 db_security_groups: Vec::new(),
614 domain: None,
615 domain_fqdn: None,
616 domain_ou: None,
617 domain_iam_role_name: None,
618 domain_auth_secret_arn: None,
619 domain_dns_ips: Vec::new(),
620 db_cluster_identifier: db_cluster_identifier.clone(),
621 };
622 state.finish_instance_creation(placeholder.clone());
623 placeholder
624 };
625 let instance_arn = instance.db_instance_arn.clone();
626
627 self.emit_event(
628 RdsSourceType::DbInstance,
629 &db_instance_identifier,
630 &instance_arn,
631 "RDS-EVENT-0005",
632 &["creation"],
633 "DB instance created",
634 );
635
636 {
637 let state_handle = self.state.clone();
638 let delivery_bus = self.delivery_bus.clone();
639 let runtime = runtime.clone();
640 let id = db_instance_identifier.clone();
641 let engine = engine.clone();
642 let engine_version = engine_version.clone();
643 let master_username = master_username.clone();
644 let master_user_password = master_user_password.clone();
645 let logical_db_name_task = logical_db_name.clone();
646 let account_id = request.account_id.clone();
647 let region = request.region.clone();
648 let arn = instance_arn.clone();
649 let snapshot_store = self.snapshot_store.clone();
650 let snapshot_lock = self.snapshot_lock.clone();
651 let cluster_id_for_attach = db_cluster_identifier.clone();
652 tokio::spawn(async move {
653 match runtime
654 .ensure_postgres(
655 &id,
656 &engine,
657 &engine_version,
658 &master_username,
659 &master_user_password,
660 &logical_db_name_task,
661 &account_id,
662 ®ion,
663 )
664 .await
665 {
666 Ok(running) => {
667 let pending_dump = if let Some(ref cid) = cluster_id_for_attach {
674 let mut accounts = state_handle.write();
675 let state = accounts.get_or_create(&account_id);
676 state
677 .extras
678 .get_mut("clusters")
679 .and_then(|m| m.get_mut(cid))
680 .and_then(|entry| entry.as_object_mut())
681 .and_then(|obj| obj.remove("PendingRestoreDumpB64"))
682 .and_then(|v| v.as_str().map(str::to_string))
683 .and_then(|b64| {
684 use base64::Engine;
685 base64::engine::general_purpose::STANDARD
686 .decode(b64.as_bytes())
687 .ok()
688 })
689 } else {
690 None
691 };
692 if let Some(dump) = pending_dump {
693 if let Err(error) = runtime
694 .restore_database(
695 &id,
696 &engine,
697 &master_username,
698 &master_user_password,
699 &logical_db_name_task,
700 &dump,
701 )
702 .await
703 {
704 tracing::error!(%error, db_instance_identifier=%id, "cluster restore dump replay failed");
705 }
706 }
707
708 {
709 let mut accounts = state_handle.write();
710 let state = accounts.get_or_create(&account_id);
711 if let Some(inst) = state.instances.get_mut(&id) {
712 inst.db_instance_status = "available".to_string();
713 inst.endpoint_address = "127.0.0.1".to_string();
714 inst.port = i32::from(running.host_port);
715 inst.host_port = running.host_port;
716 inst.container_id = running.container_id;
717 }
718 if let Some(ref cid) = cluster_id_for_attach {
721 attach_cluster_member(state, cid, &id);
722 }
723 }
724 save_snapshot_static(
729 state_handle.clone(),
730 snapshot_store.clone(),
731 snapshot_lock.clone(),
732 )
733 .await;
734 }
735 Err(error) => {
736 tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
737 {
738 let mut accounts = state_handle.write();
739 let state = accounts.get_or_create(&account_id);
740 state.instances.remove(&id);
741 }
742 save_snapshot_static(
743 state_handle.clone(),
744 snapshot_store.clone(),
745 snapshot_lock.clone(),
746 )
747 .await;
748 emit_event_static(
749 delivery_bus.as_ref(),
750 RdsSourceType::DbInstance,
751 &id,
752 &arn,
753 "RDS-EVENT-0058",
754 &["failure"],
755 &format!("DB instance failed to create: {}", error),
756 );
757 }
758 }
759 });
760 }
761
762 Ok(AwsResponse::xml(
763 StatusCode::OK,
764 query_response_xml(
765 "CreateDBInstance",
766 RDS_NS,
767 &format!(
768 "<DBInstance>{}</DBInstance>",
769 db_instance_xml(&instance, None)
770 ),
771 &request.request_id,
772 ),
773 ))
774 }
775
776 async fn delete_db_instance(
777 &self,
778 request: &AwsRequest,
779 ) -> Result<AwsResponse, AwsServiceError> {
780 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
781 let skip_final_snapshot =
782 parse_optional_bool(optional_query_param(request, "SkipFinalSnapshot").as_deref())?
783 .unwrap_or(false);
784 let final_db_snapshot_identifier =
785 optional_query_param(request, "FinalDBSnapshotIdentifier");
786
787 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
788 return Err(AwsServiceError::aws_error(
789 StatusCode::BAD_REQUEST,
790 "InvalidParameterCombination",
791 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
792 ));
793 }
794 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
795 return Err(AwsServiceError::aws_error(
796 StatusCode::BAD_REQUEST,
797 "InvalidParameterCombination",
798 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
799 ));
800 }
801
802 {
804 let accounts = self.state.read();
805 let empty = RdsState::new(&request.account_id, &request.region);
806 let state = accounts.get(&request.account_id).unwrap_or(&empty);
807 if let Some(instance) = state.instances.get(&db_instance_identifier) {
808 if instance.deletion_protection {
809 return Err(AwsServiceError::aws_error(
810 StatusCode::BAD_REQUEST,
811 "InvalidDBInstanceState",
812 format!(
813 "DBInstance {} cannot be deleted because deletion protection is enabled.",
814 db_instance_identifier
815 ),
816 ));
817 }
818 } else {
819 return Err(db_instance_not_found(&db_instance_identifier));
820 }
821 }
822
823 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
824 self.create_final_db_snapshot(
825 &db_instance_identifier,
826 snapshot_id,
827 &request.account_id,
828 &request.region,
829 )
830 .await?;
831 }
832
833 let instance = {
834 let mut accounts = self.state.write();
835 let state = accounts.get_or_create(&request.account_id);
836 let instance = state
837 .instances
838 .remove(&db_instance_identifier)
839 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
840
841 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
842 if let Some(source) = state.instances.get_mut(source_id) {
843 source
844 .read_replica_db_instance_identifiers
845 .retain(|id| id != &db_instance_identifier);
846 }
847 }
848
849 for replica_id in &instance.read_replica_db_instance_identifiers {
850 if let Some(replica) = state.instances.get_mut(replica_id) {
851 replica.read_replica_source_db_instance_identifier = None;
852 }
853 }
854
855 instance
856 };
857
858 if let Some(runtime) = &self.runtime {
859 runtime.stop_container(&db_instance_identifier).await;
860 }
861
862 self.emit_event(
863 RdsSourceType::DbInstance,
864 &db_instance_identifier,
865 &instance.db_instance_arn,
866 "RDS-EVENT-0003",
867 &["deletion"],
868 "DB instance deleted",
869 );
870
871 Ok(AwsResponse::xml(
872 StatusCode::OK,
873 query_response_xml(
874 "DeleteDBInstance",
875 RDS_NS,
876 &format!(
877 "<DBInstance>{}</DBInstance>",
878 db_instance_xml(&instance, Some("deleting"))
879 ),
880 &request.request_id,
881 ),
882 ))
883 }
884
885 async fn create_final_db_snapshot(
891 &self,
892 db_instance_identifier: &str,
893 snapshot_id: &str,
894 account_id: &str,
895 region: &str,
896 ) -> Result<(), AwsServiceError> {
897 let runtime = self.runtime.as_ref().ok_or_else(|| {
898 AwsServiceError::aws_error(
899 StatusCode::SERVICE_UNAVAILABLE,
900 "InvalidParameterValue",
901 "Docker/Podman is required for RDS snapshots but is not available",
902 )
903 })?;
904
905 let (instance_for_snapshot, db_name) = {
906 let accounts = self.state.read();
907 let empty = RdsState::new(account_id, region);
908 let state = accounts.get(account_id).unwrap_or(&empty);
909
910 if state.snapshots.contains_key(snapshot_id) {
911 return Err(AwsServiceError::aws_error(
912 StatusCode::CONFLICT,
913 "DBSnapshotAlreadyExists",
914 format!("DBSnapshot {snapshot_id} already exists."),
915 ));
916 }
917
918 let instance = state
919 .instances
920 .get(db_instance_identifier)
921 .cloned()
922 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
923
924 let default_db = default_db_name(&instance.engine);
925 let db_name = instance
926 .db_name
927 .as_deref()
928 .unwrap_or(default_db)
929 .to_string();
930
931 (instance, db_name)
932 };
933
934 let dump_data = runtime
935 .dump_database(
936 db_instance_identifier,
937 &instance_for_snapshot.engine,
938 &instance_for_snapshot.master_username,
939 &instance_for_snapshot.master_user_password,
940 &db_name,
941 )
942 .await
943 .map_err(runtime_error_to_service_error)?;
944
945 let mut accounts = self.state.write();
946 let state = accounts.get_or_create(account_id);
947
948 if state.snapshots.contains_key(snapshot_id) {
949 return Err(AwsServiceError::aws_error(
950 StatusCode::CONFLICT,
951 "DBSnapshotAlreadyExists",
952 format!("DBSnapshot {snapshot_id} already exists."),
953 ));
954 }
955
956 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
957
958 let snapshot = DbSnapshot {
959 db_snapshot_identifier: snapshot_id.to_string(),
960 db_snapshot_arn: snapshot_arn,
961 db_instance_identifier: db_instance_identifier.to_string(),
962 snapshot_create_time: Utc::now(),
963 engine: instance_for_snapshot.engine.clone(),
964 engine_version: instance_for_snapshot.engine_version.clone(),
965 allocated_storage: instance_for_snapshot.allocated_storage,
966 status: "available".to_string(),
967 port: instance_for_snapshot.port,
968 master_username: instance_for_snapshot.master_username.clone(),
969 db_name: instance_for_snapshot.db_name.clone(),
970 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
971 snapshot_type: "automated".to_string(),
972 master_user_password: instance_for_snapshot.master_user_password.clone(),
973 tags: Vec::new(),
974 dump_data,
975 availability_zone: instance_for_snapshot.availability_zone.clone(),
976 vpc_id: None,
977 instance_create_time: Some(instance_for_snapshot.created_at),
978 license_model: Some(
979 service_helpers::license_model_for_engine(&instance_for_snapshot.engine)
980 .to_string(),
981 ),
982 iops: instance_for_snapshot.iops,
983 option_group_name: instance_for_snapshot.option_group_name.clone(),
984 percent_progress: Some(100),
985 storage_type: instance_for_snapshot.storage_type.clone(),
986 encrypted: instance_for_snapshot.storage_encrypted,
987 kms_key_id: instance_for_snapshot.kms_key_id.clone(),
988 iam_database_authentication_enabled: instance_for_snapshot
989 .iam_database_authentication_enabled,
990 timezone: None,
991 storage_throughput: None,
992 };
993
994 state.snapshots.insert(snapshot_id.to_string(), snapshot);
995 Ok(())
996 }
997
998 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1000 let apply_immediately =
1001 parse_optional_bool(optional_query_param(request, "ApplyImmediately").as_deref())?;
1002
1003 let deletion_protection =
1006 parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?;
1007 let backup_retention_period =
1008 parse_optional_i32(optional_query_param(request, "BackupRetentionPeriod").as_deref())?;
1009 let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow");
1010 let preferred_maintenance_window =
1011 optional_query_param(request, "PreferredMaintenanceWindow");
1012 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
1013 let master_user_secret_kms_key_id =
1014 optional_query_param(request, "MasterUserSecretKmsKeyId");
1015 let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
1016 let monitoring_interval =
1017 parse_optional_i32(optional_query_param(request, "MonitoringInterval").as_deref())?;
1018 let option_group_name = optional_query_param(request, "OptionGroupName");
1019 let auto_minor_version_upgrade = parse_optional_bool(
1020 optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
1021 )?;
1022 let copy_tags_to_snapshot =
1023 parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
1024 let delete_automated_backups = parse_optional_bool(
1025 optional_query_param(request, "DeleteAutomatedBackups").as_deref(),
1026 )?;
1027 let enable_iam_db_auth = parse_optional_bool(
1028 optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
1029 )?;
1030 let max_allocated_storage =
1031 parse_optional_i32(optional_query_param(request, "MaxAllocatedStorage").as_deref())?;
1032 let network_type = optional_query_param(request, "NetworkType");
1033 let domain = optional_query_param(request, "Domain");
1034 let domain_fqdn = optional_query_param(request, "DomainFqdn");
1035 let domain_ou = optional_query_param(request, "DomainOu");
1036 let domain_iam_role_name = optional_query_param(request, "DomainIAMRoleName");
1037 let domain_auth_secret_arn = optional_query_param(request, "DomainAuthSecretArn");
1038 let domain_dns_ips = {
1039 let v = parse_string_member_list(request, "DomainDnsIps");
1040 if v.is_empty() {
1041 None
1042 } else {
1043 Some(v)
1044 }
1045 };
1046 let disable_domain =
1047 parse_optional_bool(optional_query_param(request, "DisableDomain").as_deref())?;
1048 let rotate_master_user_password = parse_optional_bool(
1049 optional_query_param(request, "RotateMasterUserPassword").as_deref(),
1050 )?;
1051
1052 let db_instance_class = optional_query_param(request, "DBInstanceClass");
1053 let master_user_password = optional_query_param(request, "MasterUserPassword");
1054 let engine_version = optional_query_param(request, "EngineVersion");
1055 let allocated_storage =
1056 parse_optional_i32(optional_query_param(request, "AllocatedStorage").as_deref())?;
1057 let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?;
1058 let iops = parse_optional_i32(optional_query_param(request, "Iops").as_deref())?;
1059 let storage_type = optional_query_param(request, "StorageType");
1060 let storage_throughput =
1061 parse_optional_i32(optional_query_param(request, "StorageThroughput").as_deref())?;
1062 let performance_insights_enabled = parse_optional_bool(
1063 optional_query_param(request, "EnablePerformanceInsights").as_deref(),
1064 )?;
1065 let license_model = optional_query_param(request, "LicenseModel");
1066 let multi_tenant =
1067 parse_optional_bool(optional_query_param(request, "MultiTenant").as_deref())?;
1068 let publicly_accessible =
1069 parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?;
1070 let tde_credential_arn = optional_query_param(request, "TdeCredentialArn");
1071 let db_port_number =
1072 parse_optional_i32(optional_query_param(request, "DBPortNumber").as_deref())?;
1073
1074 let cloudwatch_enable = collect_cloudwatch_log_types(request, "EnableLogTypes");
1079 let cloudwatch_disable = collect_cloudwatch_log_types(request, "DisableLogTypes");
1080 let cloudwatch_changed = !cloudwatch_enable.is_empty() || !cloudwatch_disable.is_empty();
1081
1082 let vpc_security_group_ids = {
1084 let mut ids = Vec::new();
1085 for index in 1.. {
1086 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1087 match optional_query_param(request, &sg_id_name) {
1088 Some(sg_id) => ids.push(sg_id),
1089 None => break,
1090 }
1091 }
1092 if ids.is_empty() {
1093 None
1094 } else {
1095 Some(ids)
1096 }
1097 };
1098
1099 let db_security_groups = {
1102 let mut ids = Vec::new();
1103 for index in 1.. {
1104 let key = format!("DBSecurityGroups.DBSecurityGroupName.{index}");
1105 match optional_query_param(request, &key) {
1106 Some(name) => ids.push(name),
1107 None => break,
1108 }
1109 }
1110 if ids.is_empty() {
1111 None
1112 } else {
1113 Some(ids)
1114 }
1115 };
1116
1117 if let Some(ref class) = db_instance_class {
1118 validate_db_instance_class(class)?;
1119 }
1120
1121 let any_mutable_field = db_instance_class.is_some()
1125 || deletion_protection.is_some()
1126 || vpc_security_group_ids.is_some()
1127 || db_security_groups.is_some()
1128 || master_user_password.is_some()
1129 || backup_retention_period.is_some()
1130 || preferred_backup_window.is_some()
1131 || preferred_maintenance_window.is_some()
1132 || engine_version.is_some()
1133 || allocated_storage.is_some()
1134 || db_parameter_group_name.is_some()
1135 || multi_az.is_some()
1136 || iops.is_some()
1137 || storage_type.is_some()
1138 || storage_throughput.is_some()
1139 || master_user_secret_kms_key_id.is_some()
1140 || ca_certificate_identifier.is_some()
1141 || monitoring_interval.is_some()
1142 || performance_insights_enabled.is_some()
1143 || cloudwatch_changed
1144 || option_group_name.is_some()
1145 || auto_minor_version_upgrade.is_some()
1146 || copy_tags_to_snapshot.is_some()
1147 || delete_automated_backups.is_some()
1148 || enable_iam_db_auth.is_some()
1149 || max_allocated_storage.is_some()
1150 || network_type.is_some()
1151 || license_model.is_some()
1152 || multi_tenant.is_some()
1153 || publicly_accessible.is_some()
1154 || tde_credential_arn.is_some()
1155 || db_port_number.is_some()
1156 || domain.is_some()
1157 || domain_fqdn.is_some()
1158 || domain_ou.is_some()
1159 || domain_iam_role_name.is_some()
1160 || domain_auth_secret_arn.is_some()
1161 || domain_dns_ips.is_some()
1162 || disable_domain.is_some()
1163 || rotate_master_user_password.is_some();
1164 if !any_mutable_field {
1165 return Err(AwsServiceError::aws_error(
1166 StatusCode::BAD_REQUEST,
1167 "InvalidParameterCombination",
1168 "At least one mutable field must be provided.",
1169 ));
1170 }
1171
1172 let mut accounts = self.state.write();
1173 let state = accounts.get_or_create(&request.account_id);
1174 let instance = state
1175 .instances
1176 .get_mut(&db_instance_identifier)
1177 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1178
1179 if let Some(deletion_protection) = deletion_protection {
1181 instance.deletion_protection = deletion_protection;
1182 }
1183 if let Some(security_group_ids) = vpc_security_group_ids {
1184 instance.vpc_security_group_ids = security_group_ids;
1185 }
1186 if let Some(sg_names) = db_security_groups {
1187 instance.db_security_groups = sg_names;
1188 }
1189 if let Some(ca_id) = ca_certificate_identifier {
1190 instance.ca_certificate_identifier = Some(ca_id);
1191 }
1192 if let Some(kms_key) = master_user_secret_kms_key_id {
1193 instance.master_user_secret_kms_key_id = Some(kms_key);
1194 }
1195 if let Some(name) = option_group_name {
1196 instance.option_group_name = Some(name);
1197 }
1198 if let Some(b) = auto_minor_version_upgrade {
1199 instance.auto_minor_version_upgrade = Some(b);
1200 }
1201 if let Some(b) = copy_tags_to_snapshot {
1202 instance.copy_tags_to_snapshot = Some(b);
1203 }
1204 if let Some(b) = delete_automated_backups {
1205 instance.delete_automated_backups = Some(b);
1206 }
1207 if let Some(b) = enable_iam_db_auth {
1208 instance.iam_database_authentication_enabled = b;
1209 }
1210 if let Some(n) = max_allocated_storage {
1211 instance.max_allocated_storage = Some(n);
1212 }
1213 if let Some(nt) = network_type {
1214 instance.network_type = Some(nt);
1215 }
1216 if disable_domain == Some(true) {
1217 instance.domain = None;
1218 instance.domain_fqdn = None;
1219 instance.domain_ou = None;
1220 instance.domain_iam_role_name = None;
1221 instance.domain_auth_secret_arn = None;
1222 instance.domain_dns_ips.clear();
1223 } else {
1224 if let Some(v) = domain {
1225 instance.domain = Some(v);
1226 }
1227 if let Some(v) = domain_fqdn {
1228 instance.domain_fqdn = Some(v);
1229 }
1230 if let Some(v) = domain_ou {
1231 instance.domain_ou = Some(v);
1232 }
1233 if let Some(v) = domain_iam_role_name {
1234 instance.domain_iam_role_name = Some(v);
1235 }
1236 if let Some(v) = domain_auth_secret_arn {
1237 instance.domain_auth_secret_arn = Some(v);
1238 }
1239 if let Some(v) = domain_dns_ips {
1240 instance.domain_dns_ips = v;
1241 }
1242 }
1243 if cloudwatch_changed {
1244 let mut current: Vec<String> = instance.enabled_cloudwatch_logs_exports.clone();
1245 current.retain(|t| !cloudwatch_disable.contains(t));
1246 for t in &cloudwatch_enable {
1247 if !current.contains(t) {
1248 current.push(t.clone());
1249 }
1250 }
1251 instance.enabled_cloudwatch_logs_exports = current;
1252 }
1253 if rotate_master_user_password == Some(true) {
1257 instance.master_user_password = format!("rotated-{}", uuid::Uuid::new_v4().simple());
1258 }
1259
1260 let immediate = apply_immediately != Some(false);
1262 if immediate {
1263 if let Some(class) = db_instance_class {
1264 instance.db_instance_class = class;
1265 }
1266 if let Some(pwd) = master_user_password {
1267 instance.master_user_password = pwd;
1268 }
1269 if let Some(version) = engine_version {
1270 instance.engine_version = version;
1271 }
1272 if let Some(storage) = allocated_storage {
1273 instance.allocated_storage = storage;
1274 }
1275 if let Some(name) = db_parameter_group_name {
1276 instance.db_parameter_group_name = Some(name);
1277 }
1278 if let Some(az) = multi_az {
1279 instance.multi_az = az;
1280 }
1281 if let Some(iops_val) = iops {
1282 instance.iops = Some(iops_val);
1283 }
1284 if let Some(stype) = storage_type {
1285 instance.storage_type = Some(stype);
1286 }
1287 if let Some(t) = storage_throughput {
1288 instance.storage_throughput = Some(t);
1289 }
1290 if let Some(pi) = performance_insights_enabled {
1291 instance.performance_insights_enabled = pi;
1292 }
1293 if let Some(lm) = license_model {
1294 instance.license_model = Some(lm);
1295 }
1296 if let Some(b) = multi_tenant {
1297 instance.multi_tenant = Some(b);
1298 }
1299 if let Some(b) = publicly_accessible {
1300 instance.publicly_accessible = b;
1301 }
1302 if let Some(arn) = tde_credential_arn {
1303 instance.tde_credential_arn = Some(arn);
1304 }
1305 if let Some(p) = db_port_number {
1306 instance.port = p;
1307 }
1308 if let Some(retention) = backup_retention_period {
1309 instance.backup_retention_period = retention;
1310 }
1311 if let Some(window) = preferred_backup_window {
1312 instance.preferred_backup_window = window;
1313 }
1314 if let Some(window) = preferred_maintenance_window {
1315 instance.preferred_maintenance_window = Some(window);
1316 }
1317 if let Some(interval) = monitoring_interval {
1318 instance.monitoring_interval = Some(interval);
1319 }
1320 } else {
1321 let any_deferred = db_instance_class.is_some()
1322 || master_user_password.is_some()
1323 || engine_version.is_some()
1324 || allocated_storage.is_some()
1325 || db_parameter_group_name.is_some()
1326 || multi_az.is_some()
1327 || iops.is_some()
1328 || storage_type.is_some()
1329 || storage_throughput.is_some()
1330 || performance_insights_enabled.is_some()
1331 || license_model.is_some()
1332 || multi_tenant.is_some()
1333 || publicly_accessible.is_some()
1334 || tde_credential_arn.is_some()
1335 || db_port_number.is_some()
1336 || backup_retention_period.is_some()
1337 || preferred_backup_window.is_some()
1338 || preferred_maintenance_window.is_some()
1339 || monitoring_interval.is_some();
1340 if any_deferred {
1341 let pending = instance
1342 .pending_modified_values
1343 .get_or_insert(Default::default());
1344 if let Some(class) = db_instance_class {
1345 pending.db_instance_class = Some(class);
1346 }
1347 if let Some(pwd) = master_user_password {
1348 pending.master_user_password = Some(pwd);
1349 }
1350 if let Some(version) = engine_version {
1351 pending.engine_version = Some(version);
1352 }
1353 if let Some(storage) = allocated_storage {
1354 pending.allocated_storage = Some(storage);
1355 }
1356 if let Some(name) = db_parameter_group_name {
1357 pending.db_parameter_group_name = Some(name);
1358 }
1359 if let Some(az) = multi_az {
1360 pending.multi_az = Some(az);
1361 }
1362 if let Some(iops_val) = iops {
1363 pending.iops = Some(iops_val);
1364 }
1365 if let Some(stype) = storage_type {
1366 pending.storage_type = Some(stype);
1367 }
1368 if let Some(t) = storage_throughput {
1369 pending.storage_throughput = Some(t);
1370 }
1371 if let Some(pi) = performance_insights_enabled {
1372 pending.performance_insights_enabled = Some(pi);
1373 }
1374 if let Some(lm) = license_model {
1375 pending.license_model = Some(lm);
1376 }
1377 if let Some(b) = multi_tenant {
1378 pending.multi_tenant = Some(b);
1379 }
1380 if let Some(b) = publicly_accessible {
1381 pending.publicly_accessible = Some(b);
1382 }
1383 if let Some(arn) = tde_credential_arn {
1384 pending.tde_credential_arn = Some(arn);
1385 }
1386 if let Some(p) = db_port_number {
1387 pending.port = Some(p);
1388 }
1389 if let Some(retention) = backup_retention_period {
1390 pending.backup_retention_period = Some(retention);
1391 }
1392 if let Some(window) = preferred_backup_window {
1393 pending.preferred_backup_window = Some(window);
1394 }
1395 if let Some(window) = preferred_maintenance_window {
1396 pending.preferred_maintenance_window = Some(window);
1397 }
1398 if let Some(interval) = monitoring_interval {
1399 pending.monitoring_interval = Some(interval);
1400 }
1401 }
1402 }
1403 let instance_arn = instance.db_instance_arn.clone();
1404 let xml = query_response_xml(
1405 "ModifyDBInstance",
1406 RDS_NS,
1407 &format!(
1408 "<DBInstance>{}</DBInstance>",
1409 db_instance_xml(instance, Some("modifying"))
1410 ),
1411 &request.request_id,
1412 );
1413 drop(accounts);
1414
1415 self.emit_event(
1416 RdsSourceType::DbInstance,
1417 &db_instance_identifier,
1418 &instance_arn,
1419 "RDS-EVENT-0014",
1420 &["configuration change"],
1421 "DB instance was modified",
1422 );
1423
1424 Ok(AwsResponse::xml(StatusCode::OK, xml))
1425 }
1426
1427 async fn reboot_db_instance(
1428 &self,
1429 request: &AwsRequest,
1430 ) -> Result<AwsResponse, AwsServiceError> {
1431 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1432 let force_failover =
1433 parse_optional_bool(optional_query_param(request, "ForceFailover").as_deref())?;
1434 if force_failover == Some(true) {
1435 return Err(AwsServiceError::aws_error(
1436 StatusCode::BAD_REQUEST,
1437 "InvalidParameterCombination",
1438 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
1439 ));
1440 }
1441
1442 let instance = {
1443 let accounts = self.state.read();
1444 let empty = RdsState::new(&request.account_id, &request.region);
1445 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1446 state
1447 .instances
1448 .get(&db_instance_identifier)
1449 .cloned()
1450 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
1451 };
1452
1453 let runtime = self.require_runtime()?;
1454
1455 let running = runtime
1456 .restart_container(
1457 &db_instance_identifier,
1458 &instance.engine,
1459 &instance.master_username,
1460 &instance.master_user_password,
1461 instance
1462 .db_name
1463 .as_deref()
1464 .unwrap_or(default_db_name(&instance.engine)),
1465 )
1466 .await
1467 .map_err(runtime_error_to_service_error)?;
1468
1469 let instance = {
1470 let mut accounts = self.state.write();
1471 let state = accounts.get_or_create(&request.account_id);
1472 let instance = state
1473 .instances
1474 .get_mut(&db_instance_identifier)
1475 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1476 instance.host_port = running.host_port;
1477 instance.port = i32::from(running.host_port);
1478
1479 if let Some(pending) = instance.pending_modified_values.take() {
1481 apply_pending_to_instance(instance, pending);
1482 }
1483
1484 instance.clone()
1485 };
1486
1487 self.emit_event(
1488 RdsSourceType::DbInstance,
1489 &db_instance_identifier,
1490 &instance.db_instance_arn,
1491 "RDS-EVENT-0006",
1492 &["availability"],
1493 "DB instance restarted",
1494 );
1495
1496 Ok(AwsResponse::xml(
1497 StatusCode::OK,
1498 query_response_xml(
1499 "RebootDBInstance",
1500 RDS_NS,
1501 &format!(
1502 "<DBInstance>{}</DBInstance>",
1503 db_instance_xml(&instance, Some("rebooting"))
1504 ),
1505 &request.request_id,
1506 ),
1507 ))
1508 }
1509
1510 fn describe_db_engine_versions(
1511 &self,
1512 request: &AwsRequest,
1513 ) -> Result<AwsResponse, AwsServiceError> {
1514 let engine = optional_query_param(request, "Engine");
1515 let engine_version = optional_query_param(request, "EngineVersion");
1516 let family = optional_query_param(request, "DBParameterGroupFamily");
1517 let default_only =
1518 parse_optional_bool(optional_query_param(request, "DefaultOnly").as_deref())?;
1519
1520 let mut versions = filter_engine_versions(
1521 &default_engine_versions(),
1522 &engine,
1523 &engine_version,
1524 &family,
1525 );
1526
1527 if default_only.unwrap_or(false) {
1528 versions.truncate(1);
1529 }
1530
1531 Ok(AwsResponse::xml(
1532 StatusCode::OK,
1533 query_response_xml(
1534 "DescribeDBEngineVersions",
1535 RDS_NS,
1536 &format!(
1537 "<DBEngineVersions>{}</DBEngineVersions>",
1538 versions.iter().map(engine_version_xml).collect::<String>()
1539 ),
1540 &request.request_id,
1541 ),
1542 ))
1543 }
1544
1545 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1546 let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1547 let marker = optional_query_param(request, "Marker");
1548 let max_records = optional_query_param(request, "MaxRecords");
1549
1550 let accounts = self.state.read();
1551 let empty = RdsState::new(&request.account_id, &request.region);
1552 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1553
1554 if let Some(identifier) = db_instance_identifier {
1556 let instance = state
1557 .instances
1558 .get(&identifier)
1559 .cloned()
1560 .ok_or_else(|| db_instance_not_found(&identifier))?;
1561
1562 return Ok(AwsResponse::xml(
1563 StatusCode::OK,
1564 query_response_xml(
1565 "DescribeDBInstances",
1566 RDS_NS,
1567 &format!(
1568 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1569 db_instance_xml(&instance, None)
1570 ),
1571 &request.request_id,
1572 ),
1573 ));
1574 }
1575
1576 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1578 instances.sort_by(|a, b| {
1579 a.created_at
1580 .cmp(&b.created_at)
1581 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1582 });
1583
1584 let paginated = paginate(instances, marker, max_records, |inst| {
1586 &inst.db_instance_identifier
1587 })?;
1588
1589 let marker_xml = paginated
1590 .next_marker
1591 .as_ref()
1592 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1593 .unwrap_or_default();
1594
1595 Ok(AwsResponse::xml(
1596 StatusCode::OK,
1597 query_response_xml(
1598 "DescribeDBInstances",
1599 RDS_NS,
1600 &format!(
1601 "<DBInstances>{}</DBInstances>{}",
1602 paginated
1603 .items
1604 .iter()
1605 .map(|instance| {
1606 format!(
1607 "<DBInstance>{}</DBInstance>",
1608 db_instance_xml(instance, None)
1609 )
1610 })
1611 .collect::<String>(),
1612 marker_xml
1613 ),
1614 &request.request_id,
1615 ),
1616 ))
1617 }
1618
1619 fn describe_orderable_db_instance_options(
1620 &self,
1621 request: &AwsRequest,
1622 ) -> Result<AwsResponse, AwsServiceError> {
1623 let engine = optional_query_param(request, "Engine");
1624 let engine_version = optional_query_param(request, "EngineVersion");
1625 let db_instance_class = optional_query_param(request, "DBInstanceClass");
1626 let license_model = optional_query_param(request, "LicenseModel");
1627 let vpc = parse_optional_bool(optional_query_param(request, "Vpc").as_deref())?;
1628
1629 let options = filter_orderable_options(
1630 &default_orderable_options(),
1631 &engine,
1632 &engine_version,
1633 &db_instance_class,
1634 &license_model,
1635 vpc,
1636 );
1637
1638 Ok(AwsResponse::xml(
1639 StatusCode::OK,
1640 query_response_xml(
1641 "DescribeOrderableDBInstanceOptions",
1642 RDS_NS,
1643 &format!(
1644 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1645 options.iter().map(orderable_option_xml).collect::<String>()
1646 ),
1647 &request.request_id,
1648 ),
1649 ))
1650 }
1651
1652 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1653 let resource_name = required_query_param(request, "ResourceName")?;
1654 let tags = parse_tags(request)?;
1655
1656 if tags.is_empty() {
1657 return Err(AwsServiceError::aws_error(
1658 StatusCode::BAD_REQUEST,
1659 "MissingParameter",
1660 "The request must contain the parameter Tags.",
1661 ));
1662 }
1663
1664 let mut accounts = self.state.write();
1665 let state = accounts.get_or_create(&request.account_id);
1666 let mut target = resolve_tag_target_mut(state, &resource_name)?;
1667 target.merge(&tags);
1668
1669 Ok(AwsResponse::xml(
1670 StatusCode::OK,
1671 query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
1672 ))
1673 }
1674
1675 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1676 let resource_name = required_query_param(request, "ResourceName")?;
1677 if query_param_prefix_exists(request, "Filters.") {
1678 return Err(AwsServiceError::aws_error(
1679 StatusCode::BAD_REQUEST,
1680 "InvalidParameterValue",
1681 "Filters are not yet supported for ListTagsForResource.",
1682 ));
1683 }
1684
1685 let accounts = self.state.read();
1686 let empty = RdsState::new(&request.account_id, &request.region);
1687 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1688 let target = resolve_tag_target(state, &resource_name)?;
1689 let tag_xml = target.to_xml();
1690
1691 Ok(AwsResponse::xml(
1692 StatusCode::OK,
1693 query_response_xml(
1694 "ListTagsForResource",
1695 RDS_NS,
1696 &format!("<TagList>{tag_xml}</TagList>"),
1697 &request.request_id,
1698 ),
1699 ))
1700 }
1701
1702 fn remove_tags_from_resource(
1703 &self,
1704 request: &AwsRequest,
1705 ) -> Result<AwsResponse, AwsServiceError> {
1706 let resource_name = required_query_param(request, "ResourceName")?;
1707 let tag_keys = parse_tag_keys(request)?;
1708
1709 if tag_keys.is_empty() {
1710 return Err(AwsServiceError::aws_error(
1711 StatusCode::BAD_REQUEST,
1712 "MissingParameter",
1713 "The request must contain the parameter TagKeys.",
1714 ));
1715 }
1716
1717 let mut accounts = self.state.write();
1718 let state = accounts.get_or_create(&request.account_id);
1719 let mut target = resolve_tag_target_mut(state, &resource_name)?;
1720 target.remove_keys(&tag_keys);
1721
1722 Ok(AwsResponse::xml(
1723 StatusCode::OK,
1724 query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
1725 ))
1726 }
1727
1728 async fn create_db_snapshot(
1729 &self,
1730 request: &AwsRequest,
1731 ) -> Result<AwsResponse, AwsServiceError> {
1732 let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1733 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1734
1735 let runtime = self.runtime.as_ref().ok_or_else(|| {
1736 AwsServiceError::aws_error(
1737 StatusCode::SERVICE_UNAVAILABLE,
1738 "InvalidParameterValue",
1739 "Docker/Podman is required for RDS snapshots but is not available",
1740 )
1741 })?;
1742
1743 let (instance, db_name) = {
1744 let accounts = self.state.read();
1745 let empty = RdsState::new(&request.account_id, &request.region);
1746 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1747
1748 if state.snapshots.contains_key(&db_snapshot_identifier) {
1749 return Err(AwsServiceError::aws_error(
1750 StatusCode::CONFLICT,
1751 "DBSnapshotAlreadyExists",
1752 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1753 ));
1754 }
1755
1756 let instance = state
1757 .instances
1758 .get(&db_instance_identifier)
1759 .cloned()
1760 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1761
1762 let default_db = default_db_name(&instance.engine);
1763 let db_name = instance
1764 .db_name
1765 .as_deref()
1766 .unwrap_or(default_db)
1767 .to_string();
1768
1769 (instance, db_name)
1770 };
1771
1772 let dump_data = runtime
1773 .dump_database(
1774 &db_instance_identifier,
1775 &instance.engine,
1776 &instance.master_username,
1777 &instance.master_user_password,
1778 &db_name,
1779 )
1780 .await
1781 .map_err(runtime_error_to_service_error)?;
1782
1783 let mut accounts = self.state.write();
1784 let state = accounts.get_or_create(&request.account_id);
1785
1786 if state.snapshots.contains_key(&db_snapshot_identifier) {
1787 return Err(AwsServiceError::aws_error(
1788 StatusCode::CONFLICT,
1789 "DBSnapshotAlreadyExists",
1790 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1791 ));
1792 }
1793
1794 let snapshot = DbSnapshot {
1795 db_snapshot_identifier: db_snapshot_identifier.clone(),
1796 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1797 db_instance_identifier: instance.db_instance_identifier.clone(),
1798 snapshot_create_time: Utc::now(),
1799 engine: instance.engine.clone(),
1800 engine_version: instance.engine_version.clone(),
1801 allocated_storage: instance.allocated_storage,
1802 status: "available".to_string(),
1803 port: instance.port,
1804 master_username: instance.master_username.clone(),
1805 db_name: instance.db_name.clone(),
1806 dbi_resource_id: instance.dbi_resource_id.clone(),
1807 snapshot_type: "manual".to_string(),
1808 master_user_password: instance.master_user_password.clone(),
1809 tags: Vec::new(),
1810 dump_data,
1811 availability_zone: instance.availability_zone.clone(),
1812 vpc_id: None,
1813 instance_create_time: Some(instance.created_at),
1814 license_model: Some(
1815 service_helpers::license_model_for_engine(&instance.engine).to_string(),
1816 ),
1817 iops: instance.iops,
1818 option_group_name: instance.option_group_name.clone(),
1819 percent_progress: Some(100),
1820 storage_type: instance.storage_type.clone(),
1821 encrypted: instance.storage_encrypted,
1822 kms_key_id: instance.kms_key_id.clone(),
1823 iam_database_authentication_enabled: instance.iam_database_authentication_enabled,
1824 timezone: None,
1825 storage_throughput: None,
1826 };
1827
1828 state
1829 .snapshots
1830 .insert(db_snapshot_identifier.clone(), snapshot.clone());
1831 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1832 drop(accounts);
1833
1834 self.emit_event(
1835 RdsSourceType::DbSnapshot,
1836 &db_snapshot_identifier,
1837 &snapshot_arn,
1838 "RDS-EVENT-0042",
1839 &["creation"],
1840 "Manual snapshot created",
1841 );
1842
1843 Ok(AwsResponse::xml(
1844 StatusCode::OK,
1845 query_response_xml(
1846 "CreateDBSnapshot",
1847 RDS_NS,
1848 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1849 &request.request_id,
1850 ),
1851 ))
1852 }
1853
1854 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1855 let db_snapshot_identifier = optional_query_param(request, "DBSnapshotIdentifier");
1856 let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
1857 let marker = optional_query_param(request, "Marker");
1858 let max_records = optional_query_param(request, "MaxRecords");
1859
1860 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1861 return Err(AwsServiceError::aws_error(
1862 StatusCode::BAD_REQUEST,
1863 "InvalidParameterCombination",
1864 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1865 ));
1866 }
1867
1868 let accounts = self.state.read();
1869 let empty = RdsState::new(&request.account_id, &request.region);
1870 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1871
1872 if let Some(snapshot_id) = db_snapshot_identifier {
1874 let snapshot = state
1875 .snapshots
1876 .get(&snapshot_id)
1877 .cloned()
1878 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1879
1880 return Ok(AwsResponse::xml(
1881 StatusCode::OK,
1882 query_response_xml(
1883 "DescribeDBSnapshots",
1884 RDS_NS,
1885 &format!(
1886 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1887 db_snapshot_xml(&snapshot)
1888 ),
1889 &request.request_id,
1890 ),
1891 ));
1892 }
1893
1894 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1896 state
1897 .snapshots
1898 .values()
1899 .filter(|s| s.db_instance_identifier == instance_id)
1900 .cloned()
1901 .collect()
1902 } else {
1903 state.snapshots.values().cloned().collect()
1904 };
1905
1906 snapshots.sort_by(|a, b| {
1908 a.snapshot_create_time
1909 .cmp(&b.snapshot_create_time)
1910 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1911 });
1912
1913 let paginated = paginate(snapshots, marker, max_records, |snap| {
1915 &snap.db_snapshot_identifier
1916 })?;
1917
1918 let marker_xml = paginated
1919 .next_marker
1920 .as_ref()
1921 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1922 .unwrap_or_default();
1923
1924 Ok(AwsResponse::xml(
1925 StatusCode::OK,
1926 query_response_xml(
1927 "DescribeDBSnapshots",
1928 RDS_NS,
1929 &format!(
1930 "<DBSnapshots>{}</DBSnapshots>{}",
1931 paginated
1932 .items
1933 .iter()
1934 .map(|snapshot| format!(
1935 "<DBSnapshot>{}</DBSnapshot>",
1936 db_snapshot_xml(snapshot)
1937 ))
1938 .collect::<String>(),
1939 marker_xml
1940 ),
1941 &request.request_id,
1942 ),
1943 ))
1944 }
1945
1946 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1947 let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1948
1949 let mut accounts = self.state.write();
1950 let state = accounts.get_or_create(&request.account_id);
1951
1952 let snapshot = state
1953 .snapshots
1954 .remove(&db_snapshot_identifier)
1955 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1956 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1957 drop(accounts);
1958
1959 self.emit_event(
1960 RdsSourceType::DbSnapshot,
1961 &db_snapshot_identifier,
1962 &snapshot_arn,
1963 "RDS-EVENT-0041",
1964 &["deletion"],
1965 "Manual snapshot deleted",
1966 );
1967
1968 Ok(AwsResponse::xml(
1969 StatusCode::OK,
1970 query_response_xml(
1971 "DeleteDBSnapshot",
1972 RDS_NS,
1973 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1974 &request.request_id,
1975 ),
1976 ))
1977 }
1978
1979 async fn restore_db_instance_from_db_snapshot(
1980 &self,
1981 request: &AwsRequest,
1982 ) -> Result<AwsResponse, AwsServiceError> {
1983 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
1984 let db_snapshot_identifier = required_query_param(request, "DBSnapshotIdentifier")?;
1985 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1986 let tags = parse_tags(request)?;
1987
1988 let runtime = self.require_runtime()?;
1989
1990 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1991 let mut accounts = self.state.write();
1992 let state = accounts.get_or_create(&request.account_id);
1993
1994 if !state.begin_instance_creation(&db_instance_identifier) {
1995 return Err(AwsServiceError::aws_error(
1996 StatusCode::CONFLICT,
1997 "DBInstanceAlreadyExists",
1998 format!("DBInstance {db_instance_identifier} already exists."),
1999 ));
2000 }
2001
2002 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
2003 Some(s) => s,
2004 None => {
2005 state.cancel_instance_creation(&db_instance_identifier);
2006 return Err(db_snapshot_not_found(&db_snapshot_identifier));
2007 }
2008 };
2009
2010 let dbi_resource_id = state.next_dbi_resource_id();
2011 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
2012 let created_at = Utc::now();
2013
2014 (snapshot, dbi_resource_id, db_instance_arn, created_at)
2015 };
2016
2017 let db_name = snapshot
2018 .db_name
2019 .as_deref()
2020 .unwrap_or(default_db_name(&snapshot.engine));
2021 let running = match runtime
2022 .ensure_postgres(
2023 &db_instance_identifier,
2024 &snapshot.engine,
2025 &snapshot.engine_version,
2026 &snapshot.master_username,
2027 &snapshot.master_user_password,
2028 db_name,
2029 &request.account_id,
2030 &request.region,
2031 )
2032 .await
2033 {
2034 Ok(running) => running,
2035 Err(e) => {
2036 self.state
2037 .write()
2038 .get_or_create(&request.account_id)
2039 .cancel_instance_creation(&db_instance_identifier);
2040 return Err(runtime_error_to_service_error(e));
2041 }
2042 };
2043
2044 if let Err(e) = runtime
2045 .restore_database(
2046 &db_instance_identifier,
2047 &snapshot.engine,
2048 &snapshot.master_username,
2049 &snapshot.master_user_password,
2050 db_name,
2051 &snapshot.dump_data,
2052 )
2053 .await
2054 {
2055 self.state
2056 .write()
2057 .get_or_create(&request.account_id)
2058 .cancel_instance_creation(&db_instance_identifier);
2059 runtime.stop_container(&db_instance_identifier).await;
2060 return Err(runtime_error_to_service_error(e));
2061 }
2062
2063 let instance = build_restored_instance(
2064 &db_instance_identifier,
2065 db_instance_arn,
2066 dbi_resource_id,
2067 created_at,
2068 vpc_security_group_ids,
2069 &snapshot,
2070 &running,
2071 tags,
2072 );
2073
2074 self.state
2075 .write()
2076 .get_or_create(&request.account_id)
2077 .finish_instance_creation(instance.clone());
2078
2079 self.emit_event(
2080 RdsSourceType::DbInstance,
2081 &db_instance_identifier,
2082 &instance.db_instance_arn,
2083 "RDS-EVENT-0043",
2084 &["creation"],
2085 "DB instance restored from snapshot",
2086 );
2087
2088 Ok(AwsResponse::xml(
2089 StatusCode::OK,
2090 query_response_xml(
2091 "RestoreDBInstanceFromDBSnapshot",
2092 RDS_NS,
2093 &format!(
2094 "<DBInstance>{}</DBInstance>",
2095 db_instance_xml(&instance, None)
2096 ),
2097 &request.request_id,
2098 ),
2099 ))
2100 }
2101
2102 async fn create_db_instance_read_replica(
2103 &self,
2104 request: &AwsRequest,
2105 ) -> Result<AwsResponse, AwsServiceError> {
2106 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2107 let source_db_instance_identifier =
2108 required_query_param(request, "SourceDBInstanceIdentifier")?;
2109
2110 let runtime = self.runtime.as_ref().ok_or_else(|| {
2111 AwsServiceError::aws_error(
2112 StatusCode::SERVICE_UNAVAILABLE,
2113 "InvalidParameterValue",
2114 "Docker/Podman is required for RDS read replicas but is not available",
2115 )
2116 })?;
2117
2118 let (source_instance, db_name) = {
2119 let mut accounts = self.state.write();
2120 let state = accounts.get_or_create(&request.account_id);
2121
2122 if !state.begin_instance_creation(&db_instance_identifier) {
2123 return Err(AwsServiceError::aws_error(
2124 StatusCode::CONFLICT,
2125 "DBInstanceAlreadyExists",
2126 format!("DBInstance {db_instance_identifier} already exists."),
2127 ));
2128 }
2129
2130 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
2131 {
2132 Some(inst) => inst,
2133 None => {
2134 state.cancel_instance_creation(&db_instance_identifier);
2135 return Err(db_instance_not_found(&source_db_instance_identifier));
2136 }
2137 };
2138
2139 let default_db = default_db_name(&source_instance.engine);
2140 let db_name = source_instance
2141 .db_name
2142 .as_deref()
2143 .unwrap_or(default_db)
2144 .to_string();
2145
2146 (source_instance, db_name)
2147 };
2148
2149 let dump_data = match runtime
2150 .dump_database(
2151 &source_db_instance_identifier,
2152 &source_instance.engine,
2153 &source_instance.master_username,
2154 &source_instance.master_user_password,
2155 &db_name,
2156 )
2157 .await
2158 {
2159 Ok(data) => data,
2160 Err(e) => {
2161 self.state
2162 .write()
2163 .get_or_create(&request.account_id)
2164 .cancel_instance_creation(&db_instance_identifier);
2165 return Err(runtime_error_to_service_error(e));
2166 }
2167 };
2168
2169 let (dbi_resource_id, db_instance_arn) = {
2170 let accounts = self.state.read();
2171 let empty = RdsState::new(&request.account_id, &request.region);
2172 let s = accounts.get(&request.account_id).unwrap_or(&empty);
2173 (
2174 s.next_dbi_resource_id(),
2175 s.db_instance_arn(&db_instance_identifier),
2176 )
2177 };
2178 let created_at = Utc::now();
2179
2180 let running = match runtime
2181 .ensure_postgres(
2182 &db_instance_identifier,
2183 &source_instance.engine,
2184 &source_instance.engine_version,
2185 &source_instance.master_username,
2186 &source_instance.master_user_password,
2187 &db_name,
2188 &request.account_id,
2189 &request.region,
2190 )
2191 .await
2192 {
2193 Ok(running) => running,
2194 Err(e) => {
2195 self.state
2196 .write()
2197 .get_or_create(&request.account_id)
2198 .cancel_instance_creation(&db_instance_identifier);
2199 return Err(runtime_error_to_service_error(e));
2200 }
2201 };
2202
2203 if let Err(e) = runtime
2204 .restore_database(
2205 &db_instance_identifier,
2206 &source_instance.engine,
2207 &source_instance.master_username,
2208 &source_instance.master_user_password,
2209 &db_name,
2210 &dump_data,
2211 )
2212 .await
2213 {
2214 self.state
2215 .write()
2216 .get_or_create(&request.account_id)
2217 .cancel_instance_creation(&db_instance_identifier);
2218 runtime.stop_container(&db_instance_identifier).await;
2219 return Err(runtime_error_to_service_error(e));
2220 }
2221
2222 let replica = build_read_replica_instance(
2223 &db_instance_identifier,
2224 db_instance_arn,
2225 dbi_resource_id,
2226 created_at,
2227 &source_db_instance_identifier,
2228 &source_instance,
2229 &running,
2230 );
2231
2232 let source_missing = {
2233 let mut accounts = self.state.write();
2234 let state = accounts.get_or_create(&request.account_id);
2235 match state.instances.get_mut(&source_db_instance_identifier) {
2236 Some(source) => {
2237 source
2238 .read_replica_db_instance_identifiers
2239 .push(db_instance_identifier.clone());
2240 state.finish_instance_creation(replica.clone());
2241 false
2242 }
2243 None => {
2244 state.cancel_instance_creation(&db_instance_identifier);
2245 true
2246 }
2247 }
2248 };
2249
2250 if source_missing {
2251 runtime.stop_container(&db_instance_identifier).await;
2252 return Err(db_instance_not_found(&source_db_instance_identifier));
2253 }
2254
2255 self.emit_event(
2256 RdsSourceType::DbInstance,
2257 &db_instance_identifier,
2258 &replica.db_instance_arn,
2259 "RDS-EVENT-0005",
2260 &["creation", "read replica"],
2261 "Read replica DB instance created",
2262 );
2263
2264 Ok(AwsResponse::xml(
2265 StatusCode::OK,
2266 query_response_xml(
2267 "CreateDBInstanceReadReplica",
2268 RDS_NS,
2269 &format!(
2270 "<DBInstance>{}</DBInstance>",
2271 db_instance_xml(&replica, None)
2272 ),
2273 &request.request_id,
2274 ),
2275 ))
2276 }
2277
2278 async fn restore_db_instance_to_point_in_time(
2279 &self,
2280 request: &AwsRequest,
2281 ) -> Result<AwsResponse, AwsServiceError> {
2282 let target_id = required_query_param(request, "TargetDBInstanceIdentifier")?;
2283 let source_id = required_query_param(request, "SourceDBInstanceIdentifier")?;
2284 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2285 let tags = parse_tags(request)?;
2286
2287 let (source_instance, db_name) = {
2288 let mut accounts = self.state.write();
2289 let state = accounts.get_or_create(&request.account_id);
2290
2291 if !state.begin_instance_creation(&target_id) {
2292 return Err(AwsServiceError::aws_error(
2293 StatusCode::CONFLICT,
2294 "DBInstanceAlreadyExists",
2295 format!("DBInstance {target_id} already exists."),
2296 ));
2297 }
2298
2299 let source_instance = match state.instances.get(&source_id).cloned() {
2300 Some(inst) => inst,
2301 None => {
2302 state.cancel_instance_creation(&target_id);
2303 return Err(db_instance_not_found(&source_id));
2304 }
2305 };
2306
2307 let default_db = default_db_name(&source_instance.engine);
2308 let db_name = source_instance
2309 .db_name
2310 .as_deref()
2311 .unwrap_or(default_db)
2312 .to_string();
2313
2314 (source_instance, db_name)
2315 };
2316
2317 let runtime = match self.require_runtime() {
2318 Ok(rt) => rt,
2319 Err(e) => {
2320 self.state
2321 .write()
2322 .get_or_create(&request.account_id)
2323 .cancel_instance_creation(&target_id);
2324 return Err(e);
2325 }
2326 };
2327
2328 let dump_data = match runtime
2329 .dump_database(
2330 &source_id,
2331 &source_instance.engine,
2332 &source_instance.master_username,
2333 &source_instance.master_user_password,
2334 &db_name,
2335 )
2336 .await
2337 {
2338 Ok(data) => data,
2339 Err(e) => {
2340 self.state
2341 .write()
2342 .get_or_create(&request.account_id)
2343 .cancel_instance_creation(&target_id);
2344 return Err(runtime_error_to_service_error(e));
2345 }
2346 };
2347
2348 let (dbi_resource_id, db_instance_arn) = {
2349 let accounts = self.state.read();
2350 let empty = RdsState::new(&request.account_id, &request.region);
2351 let s = accounts.get(&request.account_id).unwrap_or(&empty);
2352 (s.next_dbi_resource_id(), s.db_instance_arn(&target_id))
2353 };
2354 let created_at = Utc::now();
2355
2356 let running = match runtime
2357 .ensure_postgres(
2358 &target_id,
2359 &source_instance.engine,
2360 &source_instance.engine_version,
2361 &source_instance.master_username,
2362 &source_instance.master_user_password,
2363 &db_name,
2364 &request.account_id,
2365 &request.region,
2366 )
2367 .await
2368 {
2369 Ok(running) => running,
2370 Err(e) => {
2371 self.state
2372 .write()
2373 .get_or_create(&request.account_id)
2374 .cancel_instance_creation(&target_id);
2375 return Err(runtime_error_to_service_error(e));
2376 }
2377 };
2378
2379 if let Err(e) = runtime
2380 .restore_database(
2381 &target_id,
2382 &source_instance.engine,
2383 &source_instance.master_username,
2384 &source_instance.master_user_password,
2385 &db_name,
2386 &dump_data,
2387 )
2388 .await
2389 {
2390 self.state
2391 .write()
2392 .get_or_create(&request.account_id)
2393 .cancel_instance_creation(&target_id);
2394 runtime.stop_container(&target_id).await;
2395 return Err(runtime_error_to_service_error(e));
2396 }
2397
2398 let restore_to_time = required_query_param(request, "RestoreTime")
2399 .ok()
2400 .or_else(|| required_query_param(request, "RestoreToTime").ok());
2401 let use_latest = required_query_param(request, "UseLatestRestorableTime")
2402 .ok()
2403 .map(|s| s.eq_ignore_ascii_case("true"))
2404 .unwrap_or(false);
2405
2406 let mut instance = build_pit_restored_instance(
2407 &target_id,
2408 db_instance_arn,
2409 dbi_resource_id,
2410 created_at,
2411 vpc_security_group_ids,
2412 &source_instance,
2413 &running,
2414 tags,
2415 );
2416
2417 if let Some(t) = restore_to_time.as_ref() {
2418 if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(t) {
2419 instance.latest_restorable_time = Some(parsed.with_timezone(&Utc));
2420 }
2421 } else if use_latest {
2422 instance.latest_restorable_time = source_instance.latest_restorable_time;
2423 }
2424
2425 self.state
2426 .write()
2427 .get_or_create(&request.account_id)
2428 .finish_instance_creation(instance.clone());
2429
2430 self.emit_event(
2431 RdsSourceType::DbInstance,
2432 &target_id,
2433 &instance.db_instance_arn,
2434 "RDS-EVENT-0008",
2435 &["creation"],
2436 "DB instance restored to point in time",
2437 );
2438
2439 Ok(AwsResponse::xml(
2440 StatusCode::OK,
2441 query_response_xml(
2442 "RestoreDBInstanceToPointInTime",
2443 RDS_NS,
2444 &format!(
2445 "<DBInstance>{}</DBInstance>",
2446 db_instance_xml(&instance, None)
2447 ),
2448 &request.request_id,
2449 ),
2450 ))
2451 }
2452
2453 async fn restore_db_instance_from_s3(
2454 &self,
2455 request: &AwsRequest,
2456 ) -> Result<AwsResponse, AwsServiceError> {
2457 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
2458 let s3_bucket = required_query_param(request, "S3BucketName")?;
2459 let s3_prefix = optional_query_param(request, "S3Prefix").unwrap_or_default();
2460 let master_username = required_query_param(request, "MasterUsername")?;
2461 let master_user_password = required_query_param(request, "MasterUserPassword")?;
2462 let engine = required_query_param(request, "Engine")?;
2463 let engine_version = optional_query_param(request, "EngineVersion")
2464 .or_else(|| optional_query_param(request, "SourceEngineVersion"))
2465 .unwrap_or_else(|| match engine.as_str() {
2466 "postgres" => "16.3".to_string(),
2467 "mysql" => "8.0".to_string(),
2468 "mariadb" => "10.6".to_string(),
2469 _ => "0".to_string(),
2470 });
2471 let allocated_storage = optional_query_param(request, "AllocatedStorage")
2472 .and_then(|s| s.parse::<i32>().ok())
2473 .unwrap_or(20);
2474 let db_instance_class = optional_query_param(request, "DBInstanceClass")
2475 .unwrap_or_else(|| "db.t3.micro".to_string());
2476 let db_name_opt = optional_query_param(request, "DBName");
2477 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
2478 let tags = parse_tags(request)?;
2479
2480 let bus = self.delivery_bus.as_ref().ok_or_else(|| {
2481 AwsServiceError::aws_error(
2482 StatusCode::SERVICE_UNAVAILABLE,
2483 "InvalidParameterValue",
2484 "S3 client not wired into RDS service",
2485 )
2486 })?;
2487
2488 let dump_data = bus
2489 .get_object_from_s3(&request.account_id, &s3_bucket, &s3_prefix)
2490 .map_err(|e| {
2491 AwsServiceError::aws_error(
2492 StatusCode::BAD_REQUEST,
2493 "InvalidS3BucketFault",
2494 format!("S3 backup at {s3_bucket}/{s3_prefix} unavailable: {e}"),
2495 )
2496 })?;
2497
2498 let runtime = self.require_runtime()?;
2499
2500 let (dbi_resource_id, db_instance_arn) = {
2501 let mut accounts = self.state.write();
2502 let state = accounts.get_or_create(&request.account_id);
2503
2504 if !state.begin_instance_creation(&db_instance_identifier) {
2505 return Err(AwsServiceError::aws_error(
2506 StatusCode::CONFLICT,
2507 "DBInstanceAlreadyExists",
2508 format!("DBInstance {db_instance_identifier} already exists."),
2509 ));
2510 }
2511
2512 (
2513 state.next_dbi_resource_id(),
2514 state.db_instance_arn(&db_instance_identifier),
2515 )
2516 };
2517
2518 let db_name = db_name_opt.unwrap_or_else(|| default_db_name(&engine).to_string());
2519 let created_at = Utc::now();
2520
2521 let running = match runtime
2522 .ensure_postgres(
2523 &db_instance_identifier,
2524 &engine,
2525 &engine_version,
2526 &master_username,
2527 &master_user_password,
2528 &db_name,
2529 &request.account_id,
2530 &request.region,
2531 )
2532 .await
2533 {
2534 Ok(running) => running,
2535 Err(e) => {
2536 self.state
2537 .write()
2538 .get_or_create(&request.account_id)
2539 .cancel_instance_creation(&db_instance_identifier);
2540 return Err(runtime_error_to_service_error(e));
2541 }
2542 };
2543
2544 if let Err(e) = runtime
2545 .restore_database(
2546 &db_instance_identifier,
2547 &engine,
2548 &master_username,
2549 &master_user_password,
2550 &db_name,
2551 &dump_data,
2552 )
2553 .await
2554 {
2555 self.state
2556 .write()
2557 .get_or_create(&request.account_id)
2558 .cancel_instance_creation(&db_instance_identifier);
2559 runtime.stop_container(&db_instance_identifier).await;
2560 return Err(runtime_error_to_service_error(e));
2561 }
2562
2563 let instance = build_s3_restored_instance(
2564 &db_instance_identifier,
2565 db_instance_arn,
2566 dbi_resource_id,
2567 created_at,
2568 allocated_storage,
2569 db_instance_class,
2570 engine.clone(),
2571 engine_version,
2572 master_username,
2573 master_user_password,
2574 db_name,
2575 vpc_security_group_ids,
2576 &running,
2577 tags,
2578 );
2579
2580 self.state
2581 .write()
2582 .get_or_create(&request.account_id)
2583 .finish_instance_creation(instance.clone());
2584
2585 self.emit_event(
2586 RdsSourceType::DbInstance,
2587 &db_instance_identifier,
2588 &instance.db_instance_arn,
2589 "RDS-EVENT-0043",
2590 &["creation"],
2591 "DB instance restored from S3 backup",
2592 );
2593
2594 Ok(AwsResponse::xml(
2595 StatusCode::OK,
2596 query_response_xml(
2597 "RestoreDBInstanceFromS3",
2598 RDS_NS,
2599 &format!(
2600 "<DBInstance>{}</DBInstance>",
2601 db_instance_xml(&instance, None)
2602 ),
2603 &request.request_id,
2604 ),
2605 ))
2606 }
2607
2608 async fn create_db_cluster_snapshot(
2613 &self,
2614 request: &AwsRequest,
2615 ) -> Result<AwsResponse, AwsServiceError> {
2616 use serde_json::json;
2617 let snapshot_id = required_query_param(request, "DBClusterSnapshotIdentifier")?;
2618 let cluster_id = required_query_param(request, "DBClusterIdentifier")?;
2619 let arn = format!(
2620 "arn:aws:rds:{}:{}:cluster-snapshot:{}",
2621 request.region, request.account_id, snapshot_id
2622 );
2623
2624 let writer_info = {
2625 let accounts = self.state.read();
2626 accounts.get(&request.account_id).and_then(|state| {
2627 let cluster_entry = state.extras.get("clusters")?.get(&cluster_id)?;
2628 let writer_id = cluster_entry
2629 .get("WriterDBInstanceIdentifier")
2630 .and_then(|v| v.as_str())
2631 .map(str::to_string)
2632 .or_else(|| {
2633 cluster_entry
2634 .get("DBClusterMembers")
2635 .and_then(|m| m.as_array())
2636 .and_then(|arr| {
2637 arr.iter()
2638 .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
2639 .or_else(|| arr.first())
2640 .and_then(|m| m["DBInstanceIdentifier"].as_str())
2641 .map(str::to_string)
2642 })
2643 })?;
2644 let inst = state.instances.get(&writer_id)?;
2645 Some((
2646 inst.db_instance_identifier.clone(),
2647 inst.engine.clone(),
2648 inst.master_username.clone(),
2649 inst.master_user_password.clone(),
2650 inst.db_name
2651 .clone()
2652 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
2653 ))
2654 })
2655 };
2656
2657 let dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
2658 if let Some(runtime) = self.runtime_ref() {
2659 match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
2660 Ok(data) => {
2661 use base64::Engine;
2662 Some(base64::engine::general_purpose::STANDARD.encode(&data))
2663 }
2664 Err(e) => {
2665 tracing::warn!(
2666 error = %e,
2667 cluster = %cluster_id,
2668 writer = %wid,
2669 "cluster snapshot dump failed; falling back to metadata-only snapshot"
2670 );
2671 None
2672 }
2673 }
2674 } else {
2675 None
2676 }
2677 } else {
2678 None
2679 };
2680
2681 {
2682 let mut accounts = self.state.write();
2683 let state = accounts.get_or_create(&request.account_id);
2684 let mut entry = state
2685 .extras
2686 .get("clusters")
2687 .and_then(|m| m.get(&cluster_id))
2688 .cloned()
2689 .unwrap_or_else(|| json!({}));
2690 if let Some(obj) = entry.as_object_mut() {
2691 obj.insert(
2692 "DBClusterSnapshotIdentifier".to_string(),
2693 json!(snapshot_id),
2694 );
2695 obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
2696 obj.insert("DBClusterIdentifier".to_string(), json!(cluster_id));
2697 obj.insert("Status".to_string(), json!("available"));
2698 obj.insert("SnapshotType".to_string(), json!("manual"));
2699 if let Some(b64) = dump_b64.as_ref() {
2700 obj.insert("DumpDataB64".to_string(), json!(b64));
2701 }
2702 }
2703 state
2704 .extras
2705 .entry("cluster_snapshots".to_string())
2706 .or_default()
2707 .insert(snapshot_id.clone(), entry);
2708 }
2709
2710 self.emit_event(
2711 RdsSourceType::DbClusterSnapshot,
2712 &snapshot_id,
2713 &arn,
2714 "RDS-EVENT-0074",
2715 &["backup"],
2716 "DB cluster snapshot created",
2717 );
2718
2719 Ok(AwsResponse::xml(
2720 StatusCode::OK,
2721 query_response_xml(
2722 "CreateDBClusterSnapshot",
2723 RDS_NS,
2724 &crate::extras::cluster_snapshot_xml(&snapshot_id, &arn, &cluster_id),
2725 &request.request_id,
2726 ),
2727 ))
2728 }
2729
2730 async fn restore_db_cluster_from_snapshot(
2736 &self,
2737 request: &AwsRequest,
2738 ) -> Result<AwsResponse, AwsServiceError> {
2739 use serde_json::json;
2740 let target = required_query_param(request, "DBClusterIdentifier")?;
2741 let snapshot_id = optional_query_param(request, "SnapshotIdentifier")
2742 .or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
2743 .ok_or_else(|| {
2744 AwsServiceError::aws_error(
2745 StatusCode::BAD_REQUEST,
2746 "MissingParameter",
2747 "SnapshotIdentifier is required",
2748 )
2749 })?;
2750 let arn = format!(
2751 "arn:aws:rds:{}:{}:cluster:{}",
2752 request.region, request.account_id, target
2753 );
2754
2755 let mut accounts = self.state.write();
2756 let state = accounts.get_or_create(&request.account_id);
2757 let snapshot = state
2758 .extras
2759 .get("cluster_snapshots")
2760 .and_then(|m| m.get(&snapshot_id))
2761 .cloned()
2762 .ok_or_else(|| {
2763 AwsServiceError::aws_error(
2764 StatusCode::NOT_FOUND,
2765 "DBClusterSnapshotNotFoundFault",
2766 format!("DBClusterSnapshot {snapshot_id} not found."),
2767 )
2768 })?;
2769 let source_cluster_id = snapshot
2770 .get("DBClusterIdentifier")
2771 .and_then(|v| v.as_str())
2772 .unwrap_or("")
2773 .to_string();
2774 let pending_dump_b64 = snapshot
2775 .get("DumpDataB64")
2776 .and_then(|v| v.as_str())
2777 .map(str::to_string);
2778
2779 let mut entry = state
2780 .extras
2781 .get("clusters")
2782 .and_then(|m| m.get(&source_cluster_id))
2783 .cloned()
2784 .unwrap_or_else(|| {
2785 json!({
2786 "Engine": optional_query_param(request, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
2787 "EngineVersion": optional_query_param(request, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
2788 "MasterUsername": "postgres",
2789 "Port": 5432,
2790 })
2791 });
2792 if let Some(obj) = entry.as_object_mut() {
2793 obj.insert("DBClusterIdentifier".to_string(), json!(target));
2794 obj.insert("DBClusterArn".to_string(), json!(arn));
2795 obj.insert("Status".to_string(), json!("available"));
2796 obj.insert(
2797 "Endpoint".to_string(),
2798 json!(format!(
2799 "{target}.cluster-xxx.{}.rds.amazonaws.com",
2800 request.region
2801 )),
2802 );
2803 obj.insert(
2804 "ReaderEndpoint".to_string(),
2805 json!(format!(
2806 "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
2807 request.region
2808 )),
2809 );
2810 obj.remove("ReplicationSourceIdentifier");
2811 obj.remove("DBClusterMembers");
2812 obj.remove("WriterDBInstanceIdentifier");
2813 obj.remove("DBClusterSnapshotIdentifier");
2814 obj.remove("DBClusterSnapshotArn");
2815 obj.remove("DumpDataB64");
2816 if let Some(engine) = optional_query_param(request, "Engine") {
2817 obj.insert("Engine".to_string(), json!(engine));
2818 }
2819 if let Some(version) = optional_query_param(request, "EngineVersion") {
2820 obj.insert("EngineVersion".to_string(), json!(version));
2821 }
2822 if let Some(port) =
2823 optional_query_param(request, "Port").and_then(|p| p.parse::<i64>().ok())
2824 {
2825 obj.insert("Port".to_string(), json!(port));
2826 }
2827 if let Some(b64) = pending_dump_b64 {
2828 obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
2829 }
2830 }
2831 state
2832 .extras
2833 .entry("clusters".to_string())
2834 .or_default()
2835 .insert(target.clone(), entry);
2836 drop(accounts);
2837
2838 self.emit_event(
2839 RdsSourceType::DbCluster,
2840 &target,
2841 &arn,
2842 "RDS-EVENT-0170",
2843 &["creation"],
2844 "DB cluster restored from snapshot",
2845 );
2846
2847 Ok(AwsResponse::xml(
2848 StatusCode::OK,
2849 query_response_xml(
2850 "RestoreDBClusterFromSnapshot",
2851 RDS_NS,
2852 &crate::extras::db_cluster_xml(&target, &arn),
2853 &request.request_id,
2854 ),
2855 ))
2856 }
2857
2858 async fn restore_db_cluster_to_point_in_time(
2863 &self,
2864 request: &AwsRequest,
2865 ) -> Result<AwsResponse, AwsServiceError> {
2866 use serde_json::json;
2867 let target = required_query_param(request, "DBClusterIdentifier")?;
2868 let source = required_query_param(request, "SourceDBClusterIdentifier")?;
2869 let arn = format!(
2870 "arn:aws:rds:{}:{}:cluster:{}",
2871 request.region, request.account_id, target
2872 );
2873
2874 let writer_info = {
2875 let accounts = self.state.read();
2876 accounts.get(&request.account_id).and_then(|state| {
2877 let cluster_entry = state.extras.get("clusters")?.get(&source)?;
2878 let writer_id = cluster_entry
2879 .get("WriterDBInstanceIdentifier")
2880 .and_then(|v| v.as_str())
2881 .map(str::to_string)
2882 .or_else(|| {
2883 cluster_entry
2884 .get("DBClusterMembers")
2885 .and_then(|m| m.as_array())
2886 .and_then(|arr| {
2887 arr.iter()
2888 .find(|m| m["IsClusterWriter"].as_bool() == Some(true))
2889 .or_else(|| arr.first())
2890 .and_then(|m| m["DBInstanceIdentifier"].as_str())
2891 .map(str::to_string)
2892 })
2893 })?;
2894 let inst = state.instances.get(&writer_id)?;
2895 Some((
2896 inst.db_instance_identifier.clone(),
2897 inst.engine.clone(),
2898 inst.master_username.clone(),
2899 inst.master_user_password.clone(),
2900 inst.db_name
2901 .clone()
2902 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
2903 ))
2904 })
2905 };
2906
2907 let pending_dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
2908 if let Some(runtime) = self.runtime_ref() {
2909 match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
2910 Ok(data) => {
2911 use base64::Engine;
2912 Some(base64::engine::general_purpose::STANDARD.encode(&data))
2913 }
2914 Err(e) => {
2915 tracing::warn!(
2916 error = %e,
2917 cluster = %source,
2918 writer = %wid,
2919 "cluster PIT dump failed; falling back to metadata-only restore"
2920 );
2921 None
2922 }
2923 }
2924 } else {
2925 None
2926 }
2927 } else {
2928 None
2929 };
2930
2931 let mut accounts = self.state.write();
2932 let state = accounts.get_or_create(&request.account_id);
2933 let mut entry = state
2934 .extras
2935 .get("clusters")
2936 .and_then(|m| m.get(&source))
2937 .cloned()
2938 .ok_or_else(|| {
2939 AwsServiceError::aws_error(
2940 StatusCode::NOT_FOUND,
2941 "DBClusterNotFoundFault",
2942 format!("DBCluster {source} not found."),
2943 )
2944 })?;
2945 if let Some(obj) = entry.as_object_mut() {
2946 obj.insert("DBClusterIdentifier".to_string(), json!(target));
2947 obj.insert("DBClusterArn".to_string(), json!(arn));
2948 obj.insert("Status".to_string(), json!("available"));
2949 obj.insert(
2950 "Endpoint".to_string(),
2951 json!(format!(
2952 "{target}.cluster-xxx.{}.rds.amazonaws.com",
2953 request.region
2954 )),
2955 );
2956 obj.insert(
2957 "ReaderEndpoint".to_string(),
2958 json!(format!(
2959 "{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
2960 request.region
2961 )),
2962 );
2963 obj.remove("DBClusterMembers");
2964 obj.remove("WriterDBInstanceIdentifier");
2965 if let Some(restore_time) = optional_query_param(request, "RestoreToTime") {
2966 obj.insert("RestoreToTime".to_string(), json!(restore_time));
2967 }
2968 if let Some(latest) = optional_query_param(request, "UseLatestRestorableTime") {
2969 obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
2970 }
2971 if let Some(b64) = pending_dump_b64 {
2972 obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
2973 }
2974 }
2975 state
2976 .extras
2977 .entry("clusters".to_string())
2978 .or_default()
2979 .insert(target.clone(), entry);
2980 drop(accounts);
2981
2982 self.emit_event(
2983 RdsSourceType::DbCluster,
2984 &target,
2985 &arn,
2986 "RDS-EVENT-0171",
2987 &["creation"],
2988 "DB cluster restored to point in time",
2989 );
2990
2991 Ok(AwsResponse::xml(
2992 StatusCode::OK,
2993 query_response_xml(
2994 "RestoreDBClusterToPointInTime",
2995 RDS_NS,
2996 &crate::extras::db_cluster_xml(&target, &arn),
2997 &request.request_id,
2998 ),
2999 ))
3000 }
3001
3002 async fn describe_db_log_files(
3003 &self,
3004 request: &AwsRequest,
3005 ) -> Result<AwsResponse, AwsServiceError> {
3006 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3007 let filename_contains = optional_query_param(request, "FilenameContains");
3008 let file_last_written =
3009 optional_query_param(request, "FileLastWritten").and_then(|s| s.parse::<i64>().ok());
3010 let file_size =
3011 optional_query_param(request, "FileSize").and_then(|s| s.parse::<i64>().ok());
3012
3013 let engine = {
3014 let accounts = self.state.read();
3015 let state = accounts
3016 .get(&request.account_id)
3017 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3018 let instance = state
3019 .instances
3020 .get(&db_instance_identifier)
3021 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3022 instance.engine.clone()
3023 };
3024
3025 let now_millis = Utc::now().timestamp_millis();
3031 let candidates: Vec<(String, i64, i64)> = match engine.as_str() {
3032 "mysql" | "mariadb" => vec![
3033 ("error/mysql-error.log".to_string(), now_millis, 1024),
3034 ("slowquery/mysql-slowquery.log".to_string(), now_millis, 512),
3035 ],
3036 _ => vec![
3037 ("error/postgres.log".to_string(), now_millis, 1024),
3038 ("trace/postgres-trace.log".to_string(), now_millis, 512),
3039 ],
3040 };
3041
3042 let filtered: Vec<(String, i64, i64)> = candidates
3043 .into_iter()
3044 .filter(|(name, written, size)| {
3045 if let Some(needle) = &filename_contains {
3046 if !name.contains(needle) {
3047 return false;
3048 }
3049 }
3050 if let Some(min_written) = file_last_written {
3051 if *written / 1000 <= min_written {
3055 return false;
3056 }
3057 }
3058 if let Some(min_size) = file_size {
3059 if *size < min_size {
3060 return false;
3061 }
3062 }
3063 true
3064 })
3065 .collect();
3066
3067 let details: String = filtered
3068 .iter()
3069 .map(|(name, written, size)| {
3070 format!(
3071 "<DescribeDBLogFilesDetails>\
3072 <LogFileName>{}</LogFileName>\
3073 <LastWritten>{}</LastWritten>\
3074 <Size>{}</Size>\
3075 </DescribeDBLogFilesDetails>",
3076 xml_escape(name),
3077 written,
3078 size,
3079 )
3080 })
3081 .collect();
3082
3083 Ok(AwsResponse::xml(
3084 StatusCode::OK,
3085 query_response_xml(
3086 "DescribeDBLogFiles",
3087 RDS_NS,
3088 &format!("<DescribeDBLogFiles>{details}</DescribeDBLogFiles>"),
3089 &request.request_id,
3090 ),
3091 ))
3092 }
3093
3094 async fn download_db_log_file_portion(
3095 &self,
3096 request: &AwsRequest,
3097 ) -> Result<AwsResponse, AwsServiceError> {
3098 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
3099 let log_file_name = required_query_param(request, "LogFileName")?;
3100 let _marker = optional_query_param(request, "Marker").unwrap_or_else(|| "0".to_string());
3101 let _number_of_lines = optional_query_param(request, "NumberOfLines")
3102 .and_then(|s| s.parse::<i64>().ok())
3103 .unwrap_or(0);
3104
3105 let engine = {
3106 let accounts = self.state.read();
3107 let state = accounts
3108 .get(&request.account_id)
3109 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3110 let instance = state
3111 .instances
3112 .get(&db_instance_identifier)
3113 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
3114 instance.engine.clone()
3115 };
3116
3117 let known_synthetic = matches!(
3118 (engine.as_str(), log_file_name.as_str()),
3119 ("mysql" | "mariadb", "error/mysql-error.log")
3120 | ("mysql" | "mariadb", "slowquery/mysql-slowquery.log")
3121 | (_, "error/postgres.log")
3122 | (_, "trace/postgres-trace.log")
3123 );
3124
3125 let container_path = map_log_file_to_container_path(&engine, &log_file_name);
3126
3127 let log_data = if let Some(runtime) = self.runtime.as_ref() {
3128 match runtime
3129 .read_log_file(&db_instance_identifier, &container_path)
3130 .await
3131 {
3132 Ok(bytes) => Some(bytes),
3133 Err(RuntimeError::Unavailable) => None,
3134 Err(RuntimeError::ContainerStartFailed(_)) if known_synthetic => Some(Vec::new()),
3135 Err(RuntimeError::ContainerStartFailed(message)) => {
3136 return Err(AwsServiceError::aws_error(
3137 StatusCode::NOT_FOUND,
3138 "DBLogFileNotFoundFault",
3139 format!("DBLogFile {log_file_name} not found: {message}"),
3140 ));
3141 }
3142 }
3143 } else if known_synthetic {
3144 Some(Vec::new())
3145 } else {
3146 None
3147 };
3148
3149 let log_data = match log_data {
3150 Some(bytes) => bytes,
3151 None => {
3152 return Err(AwsServiceError::aws_error(
3153 StatusCode::NOT_FOUND,
3154 "DBLogFileNotFoundFault",
3155 format!("DBLogFile {log_file_name} not found"),
3156 ))
3157 }
3158 };
3159
3160 let payload = String::from_utf8_lossy(&log_data).into_owned();
3161 let total_bytes = payload.len();
3162
3163 Ok(AwsResponse::xml(
3164 StatusCode::OK,
3165 query_response_xml(
3166 "DownloadDBLogFilePortion",
3167 RDS_NS,
3168 &format!(
3169 "<LogFileData>{}</LogFileData>\
3170 <Marker>{}</Marker>\
3171 <AdditionalDataPending>false</AdditionalDataPending>",
3172 xml_escape(&payload),
3173 total_bytes,
3174 ),
3175 &request.request_id,
3176 ),
3177 ))
3178 }
3179
3180 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3181 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3182 let db_subnet_group_description =
3183 required_query_param(request, "DBSubnetGroupDescription")?;
3184 let subnet_ids = parse_subnet_ids(request)?;
3185
3186 if subnet_ids.is_empty() {
3187 return Err(AwsServiceError::aws_error(
3188 StatusCode::BAD_REQUEST,
3189 "InvalidParameterValue",
3190 "At least one subnet must be specified.",
3191 ));
3192 }
3193
3194 if subnet_ids.len() < 2 {
3195 return Err(AwsServiceError::aws_error(
3196 StatusCode::BAD_REQUEST,
3197 "DBSubnetGroupDoesNotCoverEnoughAZs",
3198 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3199 ));
3200 }
3201
3202 let mut accounts = self.state.write();
3203 let state = accounts.get_or_create(&request.account_id);
3204
3205 if state.subnet_groups.contains_key(&db_subnet_group_name) {
3206 return Err(AwsServiceError::aws_error(
3207 StatusCode::CONFLICT,
3208 "DBSubnetGroupAlreadyExists",
3209 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
3210 ));
3211 }
3212
3213 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
3214 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3215 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
3216 .collect();
3217
3218 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3220 if unique_azs.len() < 2 {
3221 return Err(AwsServiceError::aws_error(
3222 StatusCode::BAD_REQUEST,
3223 "DBSubnetGroupDoesNotCoverEnoughAZs",
3224 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3225 ));
3226 }
3227
3228 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
3229 let tags = parse_tags(request)?;
3230
3231 let subnet_group = DbSubnetGroup {
3232 db_subnet_group_name: db_subnet_group_name.clone(),
3233 db_subnet_group_arn,
3234 db_subnet_group_description,
3235 vpc_id,
3236 subnet_ids,
3237 subnet_availability_zones,
3238 tags,
3239 };
3240
3241 state
3242 .subnet_groups
3243 .insert(db_subnet_group_name, subnet_group.clone());
3244
3245 Ok(AwsResponse::xml(
3246 StatusCode::OK,
3247 query_response_xml(
3248 "CreateDBSubnetGroup",
3249 RDS_NS,
3250 &format!(
3251 "<DBSubnetGroup>{}</DBSubnetGroup>",
3252 db_subnet_group_xml(&subnet_group)
3253 ),
3254 &request.request_id,
3255 ),
3256 ))
3257 }
3258
3259 fn describe_db_subnet_groups(
3260 &self,
3261 request: &AwsRequest,
3262 ) -> Result<AwsResponse, AwsServiceError> {
3263 let db_subnet_group_name = optional_query_param(request, "DBSubnetGroupName");
3264 let marker = optional_query_param(request, "Marker");
3265 let max_records = optional_query_param(request, "MaxRecords");
3266
3267 let accounts = self.state.read();
3268 let empty = RdsState::new(&request.account_id, &request.region);
3269 let state = accounts.get(&request.account_id).unwrap_or(&empty);
3270
3271 if let Some(name) = db_subnet_group_name {
3273 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
3274 AwsServiceError::aws_error(
3275 StatusCode::NOT_FOUND,
3276 "DBSubnetGroupNotFoundFault",
3277 format!("DBSubnetGroup {} not found.", name),
3278 )
3279 })?;
3280
3281 return Ok(AwsResponse::xml(
3282 StatusCode::OK,
3283 query_response_xml(
3284 "DescribeDBSubnetGroups",
3285 RDS_NS,
3286 &format!(
3287 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
3288 db_subnet_group_xml(sg)
3289 ),
3290 &request.request_id,
3291 ),
3292 ));
3293 }
3294
3295 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
3297 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
3298
3299 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
3301 &sg.db_subnet_group_name
3302 })?;
3303
3304 let marker_xml = paginated
3305 .next_marker
3306 .as_ref()
3307 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3308 .unwrap_or_default();
3309
3310 let body = paginated
3311 .items
3312 .iter()
3313 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
3314 .collect::<Vec<_>>()
3315 .join("");
3316
3317 Ok(AwsResponse::xml(
3318 StatusCode::OK,
3319 query_response_xml(
3320 "DescribeDBSubnetGroups",
3321 RDS_NS,
3322 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
3323 &request.request_id,
3324 ),
3325 ))
3326 }
3327
3328 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3329 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3330
3331 let mut accounts = self.state.write();
3332 let state = accounts.get_or_create(&request.account_id);
3333
3334 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
3335 return Err(AwsServiceError::aws_error(
3336 StatusCode::NOT_FOUND,
3337 "DBSubnetGroupNotFoundFault",
3338 format!("DBSubnetGroup {db_subnet_group_name} not found."),
3339 ));
3340 }
3341
3342 Ok(AwsResponse::xml(
3343 StatusCode::OK,
3344 query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
3345 ))
3346 }
3347
3348 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3349 let db_subnet_group_name = required_query_param(request, "DBSubnetGroupName")?;
3350 let subnet_ids = parse_subnet_ids(request)?;
3351
3352 if subnet_ids.is_empty() {
3353 return Err(AwsServiceError::aws_error(
3354 StatusCode::BAD_REQUEST,
3355 "InvalidParameterValue",
3356 "At least one subnet must be specified.",
3357 ));
3358 }
3359
3360 if subnet_ids.len() < 2 {
3361 return Err(AwsServiceError::aws_error(
3362 StatusCode::BAD_REQUEST,
3363 "DBSubnetGroupDoesNotCoverEnoughAZs",
3364 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3365 ));
3366 }
3367
3368 let mut accounts = self.state.write();
3369 let state = accounts.get_or_create(&request.account_id);
3370
3371 let region = state.region.clone();
3372
3373 let subnet_group = state
3374 .subnet_groups
3375 .get_mut(&db_subnet_group_name)
3376 .ok_or_else(|| {
3377 AwsServiceError::aws_error(
3378 StatusCode::NOT_FOUND,
3379 "DBSubnetGroupNotFoundFault",
3380 format!("DBSubnetGroup {db_subnet_group_name} not found."),
3381 )
3382 })?;
3383
3384 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
3385 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
3386 .collect();
3387
3388 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
3390 if unique_azs.len() < 2 {
3391 return Err(AwsServiceError::aws_error(
3392 StatusCode::BAD_REQUEST,
3393 "DBSubnetGroupDoesNotCoverEnoughAZs",
3394 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
3395 ));
3396 }
3397
3398 subnet_group.subnet_ids = subnet_ids;
3399 subnet_group.subnet_availability_zones = subnet_availability_zones;
3400
3401 let subnet_group_clone = subnet_group.clone();
3402
3403 Ok(AwsResponse::xml(
3404 StatusCode::OK,
3405 query_response_xml(
3406 "ModifyDBSubnetGroup",
3407 RDS_NS,
3408 &format!(
3409 "<DBSubnetGroup>{}</DBSubnetGroup>",
3410 db_subnet_group_xml(&subnet_group_clone)
3411 ),
3412 &request.request_id,
3413 ),
3414 ))
3415 }
3416
3417 fn create_db_parameter_group(
3418 &self,
3419 request: &AwsRequest,
3420 ) -> Result<AwsResponse, AwsServiceError> {
3421 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3422 let db_parameter_group_family = required_query_param(request, "DBParameterGroupFamily")?;
3423 let description = required_query_param(request, "Description")?;
3424
3425 let valid_families = [
3427 "postgres16",
3428 "postgres15",
3429 "postgres14",
3430 "postgres13",
3431 "mysql8.0",
3432 "mysql5.7",
3433 "mariadb10.11",
3434 "mariadb10.6",
3435 ];
3436
3437 if !valid_families.contains(&db_parameter_group_family.as_str()) {
3438 return Err(AwsServiceError::aws_error(
3439 StatusCode::BAD_REQUEST,
3440 "InvalidParameterValue",
3441 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
3442 ));
3443 }
3444
3445 let mut accounts = self.state.write();
3446 let state = accounts.get_or_create(&request.account_id);
3447
3448 if state
3449 .parameter_groups
3450 .contains_key(&db_parameter_group_name)
3451 {
3452 return Err(AwsServiceError::aws_error(
3453 StatusCode::CONFLICT,
3454 "DBParameterGroupAlreadyExists",
3455 format!("DBParameterGroup {db_parameter_group_name} already exists."),
3456 ));
3457 }
3458
3459 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
3460 let tags = parse_tags(request)?;
3461
3462 let parameter_group = DbParameterGroup {
3463 db_parameter_group_name: db_parameter_group_name.clone(),
3464 db_parameter_group_arn,
3465 db_parameter_group_family,
3466 description,
3467 parameters: std::collections::BTreeMap::new(),
3468 tags,
3469 };
3470
3471 state
3472 .parameter_groups
3473 .insert(db_parameter_group_name.clone(), parameter_group.clone());
3474 let arn = parameter_group.db_parameter_group_arn.clone();
3475 drop(accounts);
3476
3477 self.emit_event(
3478 RdsSourceType::DbParameterGroup,
3479 &db_parameter_group_name,
3480 &arn,
3481 "RDS-EVENT-0179",
3482 &["creation"],
3483 "DB parameter group created",
3484 );
3485
3486 Ok(AwsResponse::xml(
3487 StatusCode::OK,
3488 query_response_xml(
3489 "CreateDBParameterGroup",
3490 RDS_NS,
3491 &format!(
3492 "<DBParameterGroup>{}</DBParameterGroup>",
3493 db_parameter_group_xml(¶meter_group)
3494 ),
3495 &request.request_id,
3496 ),
3497 ))
3498 }
3499
3500 fn describe_db_parameter_groups(
3501 &self,
3502 request: &AwsRequest,
3503 ) -> Result<AwsResponse, AwsServiceError> {
3504 let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
3505 let marker = optional_query_param(request, "Marker");
3506 let max_records = optional_query_param(request, "MaxRecords");
3507
3508 let accounts = self.state.read();
3509 let empty = RdsState::new(&request.account_id, &request.region);
3510 let state = accounts.get(&request.account_id).unwrap_or(&empty);
3511
3512 if let Some(name) = db_parameter_group_name {
3514 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
3515 AwsServiceError::aws_error(
3516 StatusCode::NOT_FOUND,
3517 "DBParameterGroupNotFound",
3518 format!("DBParameterGroup {} not found.", name),
3519 )
3520 })?;
3521
3522 return Ok(AwsResponse::xml(
3523 StatusCode::OK,
3524 query_response_xml(
3525 "DescribeDBParameterGroups", RDS_NS,
3526 &format!(
3527 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
3528 db_parameter_group_xml(pg)
3529 ),
3530 &request.request_id,
3531 ),
3532 ));
3533 }
3534
3535 let mut parameter_groups: Vec<DbParameterGroup> =
3537 state.parameter_groups.values().cloned().collect();
3538 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
3539
3540 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
3542 &pg.db_parameter_group_name
3543 })?;
3544
3545 let marker_xml = paginated
3546 .next_marker
3547 .as_ref()
3548 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
3549 .unwrap_or_default();
3550
3551 let body = paginated
3552 .items
3553 .iter()
3554 .map(|pg| {
3555 format!(
3556 "<DBParameterGroup>{}</DBParameterGroup>",
3557 db_parameter_group_xml(pg)
3558 )
3559 })
3560 .collect::<Vec<_>>()
3561 .join("");
3562
3563 Ok(AwsResponse::xml(
3564 StatusCode::OK,
3565 query_response_xml(
3566 "DescribeDBParameterGroups",
3567 RDS_NS,
3568 &format!(
3569 "<DBParameterGroups>{}</DBParameterGroups>{}",
3570 body, marker_xml
3571 ),
3572 &request.request_id,
3573 ),
3574 ))
3575 }
3576
3577 fn delete_db_parameter_group(
3578 &self,
3579 request: &AwsRequest,
3580 ) -> Result<AwsResponse, AwsServiceError> {
3581 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3582
3583 let arn = {
3584 let mut accounts = self.state.write();
3585 let state = accounts.get_or_create(&request.account_id);
3586
3587 if db_parameter_group_name.starts_with("default.") {
3588 return Err(AwsServiceError::aws_error(
3589 StatusCode::BAD_REQUEST,
3590 "InvalidParameterValue",
3591 "Cannot delete default parameter groups.",
3592 ));
3593 }
3594
3595 let removed = state
3596 .parameter_groups
3597 .remove(&db_parameter_group_name)
3598 .ok_or_else(|| {
3599 AwsServiceError::aws_error(
3600 StatusCode::NOT_FOUND,
3601 "DBParameterGroupNotFound",
3602 format!("DBParameterGroup {db_parameter_group_name} not found."),
3603 )
3604 })?;
3605 removed.db_parameter_group_arn
3606 };
3607
3608 self.emit_event(
3609 RdsSourceType::DbParameterGroup,
3610 &db_parameter_group_name,
3611 &arn,
3612 "RDS-EVENT-0064",
3613 &["deletion"],
3614 "DB parameter group deleted",
3615 );
3616
3617 Ok(AwsResponse::xml(
3618 StatusCode::OK,
3619 query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
3620 ))
3621 }
3622
3623 fn modify_db_parameter_group(
3624 &self,
3625 request: &AwsRequest,
3626 ) -> Result<AwsResponse, AwsServiceError> {
3627 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3628
3629 let parsed_params = parse_db_parameter_members(request);
3634
3635 let mut accounts = self.state.write();
3636 let state = accounts.get_or_create(&request.account_id);
3637
3638 let parameter_group = state
3639 .parameter_groups
3640 .get_mut(&db_parameter_group_name)
3641 .ok_or_else(|| {
3642 AwsServiceError::aws_error(
3643 StatusCode::NOT_FOUND,
3644 "DBParameterGroupNotFound",
3645 format!("DBParameterGroup {db_parameter_group_name} not found."),
3646 )
3647 })?;
3648
3649 if let Some(new_description) = optional_query_param(request, "Description") {
3650 parameter_group.description = new_description;
3651 }
3652
3653 for (name, value) in parsed_params {
3654 parameter_group.parameters.insert(name, value);
3655 }
3656
3657 let parameter_group_clone = parameter_group.clone();
3658 let arn = parameter_group_clone.db_parameter_group_arn.clone();
3659 drop(accounts);
3660
3661 self.emit_event(
3662 RdsSourceType::DbParameterGroup,
3663 &db_parameter_group_name,
3664 &arn,
3665 "RDS-EVENT-0037",
3666 &["configuration change"],
3667 "DB parameter group modified",
3668 );
3669
3670 Ok(AwsResponse::xml(
3671 StatusCode::OK,
3672 query_response_xml(
3673 "ModifyDBParameterGroup",
3674 RDS_NS,
3675 &format!(
3676 "<DBParameterGroupName>{}</DBParameterGroupName>",
3677 xml_escape(¶meter_group_clone.db_parameter_group_name)
3678 ),
3679 &request.request_id,
3680 ),
3681 ))
3682 }
3683
3684 fn describe_db_parameters_real(
3685 &self,
3686 request: &AwsRequest,
3687 ) -> Result<AwsResponse, AwsServiceError> {
3688 let db_parameter_group_name = required_query_param(request, "DBParameterGroupName")?;
3689 let source_filter = optional_query_param(request, "Source");
3690
3691 let accounts = self.state.read();
3692 let state = match accounts.get(&request.account_id) {
3693 Some(s) => s,
3694 None => {
3695 return Err(AwsServiceError::aws_error(
3696 StatusCode::NOT_FOUND,
3697 "DBParameterGroupNotFound",
3698 format!("DBParameterGroup {db_parameter_group_name} not found."),
3699 ));
3700 }
3701 };
3702 let parameter_group = state
3703 .parameter_groups
3704 .get(&db_parameter_group_name)
3705 .ok_or_else(|| {
3706 AwsServiceError::aws_error(
3707 StatusCode::NOT_FOUND,
3708 "DBParameterGroupNotFound",
3709 format!("DBParameterGroup {db_parameter_group_name} not found."),
3710 )
3711 })?;
3712
3713 let source = source_filter.as_deref();
3721 let include_user = source.is_none_or(|s| s == "user");
3722 let include_engine_default = source.is_none_or(|s| s == "engine-default");
3723 let mut members_xml = String::new();
3724 if include_user {
3725 for (name, value) in ¶meter_group.parameters {
3726 members_xml.push_str(&render_user_parameter_xml(name, value));
3727 }
3728 }
3729 if include_engine_default {
3730 for default in
3735 crate::state::engine_default_parameters(¶meter_group.db_parameter_group_family)
3736 {
3737 if parameter_group.parameters.contains_key(default.name) {
3738 continue;
3739 }
3740 members_xml.push_str(&render_engine_default_parameter_xml(default));
3741 }
3742 }
3743 let body = format!(" <Parameters>\n{members_xml} </Parameters>");
3744 Ok(AwsResponse::xml(
3745 StatusCode::OK,
3746 query_response_xml("DescribeDBParameters", RDS_NS, &body, &request.request_id),
3747 ))
3748 }
3749}
3750
3751pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
3755 format!(
3756 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
3757 xml_escape(name),
3758 xml_escape(value),
3759 )
3760}
3761
3762pub(crate) fn render_engine_default_parameter_xml(
3766 default: &crate::state::EngineDefaultParameter,
3767) -> String {
3768 format!(
3769 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
3770 xml_escape(default.name),
3771 xml_escape(default.value),
3772 xml_escape(default.apply_type),
3773 xml_escape(default.data_type),
3774 xml_escape(default.allowed_values),
3775 default.is_modifiable,
3776 )
3777}
3778
3779pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
3787 let mut out = Vec::new();
3788 for prefix in ["Parameters.Parameter", "Parameters.member"] {
3789 let mut index = 1;
3790 loop {
3791 let name_key = format!("{prefix}.{index}.ParameterName");
3792 let value_key = format!("{prefix}.{index}.ParameterValue");
3793 let name = optional_query_param(request, &name_key);
3794 let value = optional_query_param(request, &value_key);
3795 if name.is_none() && value.is_none() {
3796 break;
3797 }
3798 if let (Some(n), Some(v)) = (name, value) {
3799 if !n.is_empty() {
3800 out.push((n, v));
3801 }
3802 }
3803 index += 1;
3804 }
3805 }
3806 out
3807}
3808
3809fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
3813 match (engine, log_file_name) {
3814 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
3815 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
3816 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
3817 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
3818 "/var/log/mysql/slow.log".to_string()
3819 }
3820 _ => log_file_name.to_string(),
3821 }
3822}
3823
3824pub(crate) struct PaginationResult<T> {
3825 items: Vec<T>,
3826 next_marker: Option<String>,
3827}
3828
3829fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
3833 use serde_json::{json, Value};
3834 let Some(map) = state.extras.get_mut("clusters") else {
3835 return;
3836 };
3837 let Some(entry) = map.get_mut(cluster_id) else {
3838 return;
3839 };
3840 let Some(obj) = entry.as_object_mut() else {
3841 return;
3842 };
3843 let mut members: Vec<Value> = obj
3844 .get("DBClusterMembers")
3845 .and_then(|v| v.as_array())
3846 .cloned()
3847 .unwrap_or_default();
3848 if members
3849 .iter()
3850 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
3851 {
3852 return;
3853 }
3854 let has_writer = members
3855 .iter()
3856 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
3857 let promotion_tier = (members.len() as i64) + 1;
3858 members.push(json!({
3859 "DBInstanceIdentifier": instance_id,
3860 "IsClusterWriter": !has_writer,
3861 "DBClusterParameterGroupStatus": "in-sync",
3862 "PromotionTier": promotion_tier,
3863 }));
3864 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
3865 if !has_writer {
3866 obj.insert(
3867 "WriterDBInstanceIdentifier".to_string(),
3868 Value::String(instance_id.to_string()),
3869 );
3870 }
3871}
3872
3873#[path = "service_helpers.rs"]
3874mod service_helpers;
3875pub(crate) use service_helpers::*;
3876
3877#[cfg(test)]
3878#[path = "service_tests.rs"]
3879mod tests;