1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::query_response_xml;
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25fn is_mutating_action(action: &str) -> bool {
26 if matches!(
27 action,
28 "AddTagsToResource"
29 | "CreateDBInstance"
30 | "CreateDBInstanceReadReplica"
31 | "CreateDBParameterGroup"
32 | "CreateDBSnapshot"
33 | "CreateDBSubnetGroup"
34 | "DeleteDBInstance"
35 | "DeleteDBParameterGroup"
36 | "DeleteDBSnapshot"
37 | "DeleteDBSubnetGroup"
38 | "ModifyDBInstance"
39 | "ModifyDBParameterGroup"
40 | "ModifyDBSubnetGroup"
41 | "RebootDBInstance"
42 | "RemoveTagsFromResource"
43 | "RestoreDBInstanceFromDBSnapshot"
44 ) {
45 return true;
46 }
47 let mutating_prefixes = [
49 "Create",
50 "Modify",
51 "Delete",
52 "Reboot",
53 "Start",
54 "Stop",
55 "Failover",
56 "Switchover",
57 "Promote",
58 "Reset",
59 "Apply",
60 "Authorize",
61 "Revoke",
62 "Add",
63 "Remove",
64 "Register",
65 "Deregister",
66 "Copy",
67 "Restore",
68 "Backtrack",
69 "Cancel",
70 "Purchase",
71 "Disable",
72 "Enable",
73 ];
74 mutating_prefixes.iter().any(|p| action.starts_with(p))
75}
76const SUPPORTED_ACTIONS: &[&str] = &[
77 "AddRoleToDBCluster",
78 "AddRoleToDBInstance",
79 "AddSourceIdentifierToSubscription",
80 "AddTagsToResource",
81 "ApplyPendingMaintenanceAction",
82 "AuthorizeDBSecurityGroupIngress",
83 "BacktrackDBCluster",
84 "CancelExportTask",
85 "CopyDBClusterParameterGroup",
86 "CopyDBClusterSnapshot",
87 "CopyDBParameterGroup",
88 "CopyDBSnapshot",
89 "CopyOptionGroup",
90 "CreateBlueGreenDeployment",
91 "CreateCustomDBEngineVersion",
92 "CreateDBCluster",
93 "CreateDBClusterEndpoint",
94 "CreateDBClusterParameterGroup",
95 "CreateDBClusterSnapshot",
96 "CreateDBInstance",
97 "CreateDBInstanceReadReplica",
98 "CreateDBParameterGroup",
99 "CreateDBProxy",
100 "CreateDBProxyEndpoint",
101 "CreateDBSecurityGroup",
102 "CreateDBShardGroup",
103 "CreateDBSnapshot",
104 "CreateDBSubnetGroup",
105 "CreateEventSubscription",
106 "CreateGlobalCluster",
107 "CreateIntegration",
108 "CreateOptionGroup",
109 "CreateTenantDatabase",
110 "DeleteBlueGreenDeployment",
111 "DeleteCustomDBEngineVersion",
112 "DeleteDBCluster",
113 "DeleteDBClusterAutomatedBackup",
114 "DeleteDBClusterEndpoint",
115 "DeleteDBClusterParameterGroup",
116 "DeleteDBClusterSnapshot",
117 "DeleteDBInstance",
118 "DeleteDBInstanceAutomatedBackup",
119 "DeleteDBParameterGroup",
120 "DeleteDBProxy",
121 "DeleteDBProxyEndpoint",
122 "DeleteDBSecurityGroup",
123 "DeleteDBShardGroup",
124 "DeleteDBSnapshot",
125 "DeleteDBSubnetGroup",
126 "DeleteEventSubscription",
127 "DeleteGlobalCluster",
128 "DeleteIntegration",
129 "DeleteOptionGroup",
130 "DeleteTenantDatabase",
131 "DeregisterDBProxyTargets",
132 "DescribeAccountAttributes",
133 "DescribeBlueGreenDeployments",
134 "DescribeCertificates",
135 "DescribeDBClusterAutomatedBackups",
136 "DescribeDBClusterBacktracks",
137 "DescribeDBClusterEndpoints",
138 "DescribeDBClusterParameterGroups",
139 "DescribeDBClusterParameters",
140 "DescribeDBClusterSnapshotAttributes",
141 "DescribeDBClusterSnapshots",
142 "DescribeDBClusters",
143 "DescribeDBEngineVersions",
144 "DescribeDBInstanceAutomatedBackups",
145 "DescribeDBInstances",
146 "DescribeDBLogFiles",
147 "DescribeDBMajorEngineVersions",
148 "DescribeDBParameterGroups",
149 "DescribeDBParameters",
150 "DescribeDBProxies",
151 "DescribeDBProxyEndpoints",
152 "DescribeDBProxyTargetGroups",
153 "DescribeDBProxyTargets",
154 "DescribeDBRecommendations",
155 "DescribeDBSecurityGroups",
156 "DescribeDBShardGroups",
157 "DescribeDBSnapshotAttributes",
158 "DescribeDBSnapshotTenantDatabases",
159 "DescribeDBSnapshots",
160 "DescribeDBSubnetGroups",
161 "DescribeEngineDefaultClusterParameters",
162 "DescribeEngineDefaultParameters",
163 "DescribeEventCategories",
164 "DescribeEventSubscriptions",
165 "DescribeEvents",
166 "DescribeExportTasks",
167 "DescribeGlobalClusters",
168 "DescribeIntegrations",
169 "DescribeOptionGroupOptions",
170 "DescribeOptionGroups",
171 "DescribeOrderableDBInstanceOptions",
172 "DescribePendingMaintenanceActions",
173 "DescribeReservedDBInstances",
174 "DescribeReservedDBInstancesOfferings",
175 "DescribeSourceRegions",
176 "DescribeTenantDatabases",
177 "DescribeValidDBInstanceModifications",
178 "DisableHttpEndpoint",
179 "DownloadDBLogFilePortion",
180 "EnableHttpEndpoint",
181 "FailoverDBCluster",
182 "FailoverGlobalCluster",
183 "ListTagsForResource",
184 "ModifyActivityStream",
185 "ModifyCertificates",
186 "ModifyCurrentDBClusterCapacity",
187 "ModifyCustomDBEngineVersion",
188 "ModifyDBCluster",
189 "ModifyDBClusterEndpoint",
190 "ModifyDBClusterParameterGroup",
191 "ModifyDBClusterSnapshotAttribute",
192 "ModifyDBInstance",
193 "ModifyDBParameterGroup",
194 "ModifyDBProxy",
195 "ModifyDBProxyEndpoint",
196 "ModifyDBProxyTargetGroup",
197 "ModifyDBRecommendation",
198 "ModifyDBShardGroup",
199 "ModifyDBSnapshot",
200 "ModifyDBSnapshotAttribute",
201 "ModifyDBSubnetGroup",
202 "ModifyEventSubscription",
203 "ModifyGlobalCluster",
204 "ModifyIntegration",
205 "ModifyOptionGroup",
206 "ModifyTenantDatabase",
207 "PromoteReadReplica",
208 "PromoteReadReplicaDBCluster",
209 "PurchaseReservedDBInstancesOffering",
210 "RebootDBCluster",
211 "RebootDBInstance",
212 "RebootDBShardGroup",
213 "RegisterDBProxyTargets",
214 "RemoveFromGlobalCluster",
215 "RemoveRoleFromDBCluster",
216 "RemoveRoleFromDBInstance",
217 "RemoveSourceIdentifierFromSubscription",
218 "RemoveTagsFromResource",
219 "ResetDBClusterParameterGroup",
220 "ResetDBParameterGroup",
221 "RestoreDBClusterFromS3",
222 "RestoreDBClusterFromSnapshot",
223 "RestoreDBClusterToPointInTime",
224 "RestoreDBInstanceFromDBSnapshot",
225 "RestoreDBInstanceFromS3",
226 "RestoreDBInstanceToPointInTime",
227 "RevokeDBSecurityGroupIngress",
228 "StartActivityStream",
229 "StartDBCluster",
230 "StartDBInstance",
231 "StartDBInstanceAutomatedBackupsReplication",
232 "StartExportTask",
233 "StopActivityStream",
234 "StopDBCluster",
235 "StopDBInstance",
236 "StopDBInstanceAutomatedBackupsReplication",
237 "SwitchoverBlueGreenDeployment",
238 "SwitchoverGlobalCluster",
239 "SwitchoverReadReplica",
240];
241
242pub struct RdsService {
243 pub(crate) state: SharedRdsState,
244 runtime: Option<Arc<RdsRuntime>>,
245 snapshot_store: Option<Arc<dyn SnapshotStore>>,
246 snapshot_lock: Arc<AsyncMutex<()>>,
247 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
248}
249
250#[derive(Clone, Copy)]
252#[allow(dead_code, clippy::enum_variant_names)]
253pub(crate) enum RdsSourceType {
254 DbInstance,
255 DbSnapshot,
256 DbParameterGroup,
257}
258
259impl RdsSourceType {
260 fn as_str(self) -> &'static str {
261 match self {
262 Self::DbInstance => "DB_INSTANCE",
263 Self::DbSnapshot => "DB_SNAPSHOT",
264 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
265 }
266 }
267
268 fn detail_type(self) -> &'static str {
269 match self {
270 Self::DbInstance => "RDS DB Instance Event",
271 Self::DbSnapshot => "RDS DB Snapshot Event",
272 Self::DbParameterGroup => "RDS DB Parameter Group Event",
273 }
274 }
275}
276
277impl RdsService {
278 pub(crate) fn state_handle(&self) -> &SharedRdsState {
279 &self.state
280 }
281}
282
283impl RdsService {
284 pub fn new(state: SharedRdsState) -> Self {
285 Self {
286 state,
287 runtime: None,
288 snapshot_store: None,
289 snapshot_lock: Arc::new(AsyncMutex::new(())),
290 delivery_bus: None,
291 }
292 }
293
294 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
295 self.runtime = Some(runtime);
296 self
297 }
298
299 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
300 self.snapshot_store = Some(store);
301 self
302 }
303
304 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
305 self.delivery_bus = Some(bus);
306 self
307 }
308
309 pub(crate) fn emit_event(
312 &self,
313 source_type: RdsSourceType,
314 source_identifier: &str,
315 source_arn: &str,
316 event_id: &str,
317 event_categories: &[&str],
318 message: &str,
319 ) {
320 emit_event_static(
321 self.delivery_bus.as_ref(),
322 source_type,
323 source_identifier,
324 source_arn,
325 event_id,
326 event_categories,
327 message,
328 );
329 }
330
331 async fn save_snapshot(&self) {
332 let Some(store) = self.snapshot_store.clone() else {
333 return;
334 };
335 let _guard = self.snapshot_lock.lock().await;
336 let snapshot = RdsSnapshot {
337 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
338 state: None,
339 accounts: Some(self.state.read().clone()),
340 };
341 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
342 let bytes = serde_json::to_vec(&snapshot)
343 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
344 store.save(&bytes)
345 })
346 .await;
347 match join {
348 Ok(Ok(())) => {}
349 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
350 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
351 }
352 }
353
354 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
360 self.runtime.as_ref().ok_or_else(|| {
361 AwsServiceError::aws_error(
362 StatusCode::SERVICE_UNAVAILABLE,
363 "InvalidParameterValue",
364 "Docker/Podman is required for RDS DB instances but is not available",
365 )
366 })
367 }
368}
369
370#[async_trait]
371impl AwsService for RdsService {
372 fn service_name(&self) -> &str {
373 "rds"
374 }
375
376 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
377 let mutates = is_mutating_action(request.action.as_str());
378 let result = match request.action.as_str() {
379 "AddTagsToResource" => self.add_tags_to_resource(&request),
380 "CreateDBInstance" => self.create_db_instance(&request).await,
381 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
382 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
383 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
384 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
385 "DeleteDBInstance" => self.delete_db_instance(&request).await,
386 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
387 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
388 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
389 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
390 "DescribeDBInstances" => self.describe_db_instances(&request),
391 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
392 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
393 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
394 "DescribeOrderableDBInstanceOptions" => {
395 self.describe_orderable_db_instance_options(&request)
396 }
397 "ListTagsForResource" => self.list_tags_for_resource(&request),
398 "ModifyDBInstance" => self.modify_db_instance(&request),
399 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
400 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
401 "RebootDBInstance" => self.reboot_db_instance(&request).await,
402 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
403 "RestoreDBInstanceFromDBSnapshot" => {
404 self.restore_db_instance_from_db_snapshot(&request).await
405 }
406 _ => self.handle_extra_action(&request),
407 };
408 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
409 self.save_snapshot().await;
410 }
411 result
412 }
413
414 fn supported_actions(&self) -> &[&str] {
415 SUPPORTED_ACTIONS
416 }
417}
418
419impl RdsService {
420 async fn create_db_instance(
421 &self,
422 request: &AwsRequest,
423 ) -> Result<AwsResponse, AwsServiceError> {
424 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
425 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
426 let db_instance_class = required_param(request, "DBInstanceClass")?;
427 let engine = required_param(request, "Engine")?;
428 let master_username = required_param(request, "MasterUsername")?;
429 let master_user_password = required_param(request, "MasterUserPassword")?;
430 let db_name = optional_param(request, "DBName");
431 let engine_version =
432 optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
433 let publicly_accessible =
434 parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
435 .unwrap_or(true);
436 let deletion_protection =
437 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
438 .unwrap_or(false);
439 let port = optional_i32_param(request, "Port")?
440 .unwrap_or_else(|| default_port_for_engine(&engine));
441 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
442
443 let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
444 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
445
446 let backup_retention_period =
447 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
448 let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
449 .unwrap_or_else(|| "03:00-04:00".to_string());
450 let option_group_name = optional_param(request, "OptionGroupName");
451 let multi_az =
452 parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
453
454 validate_create_request(
455 &db_instance_identifier,
456 allocated_storage,
457 &db_instance_class,
458 &engine,
459 &engine_version,
460 port,
461 )?;
462
463 {
464 let mut accounts = self.state.write();
465 let state = accounts.get_or_create(&request.account_id);
466 if !state.begin_instance_creation(&db_instance_identifier) {
467 return Err(AwsServiceError::aws_error(
468 StatusCode::BAD_REQUEST,
469 "DBInstanceAlreadyExists",
470 format!("DBInstance {} already exists.", db_instance_identifier),
471 ));
472 }
473 if let Some(ref pg_name) = db_parameter_group_name {
475 if !state.parameter_groups.contains_key(pg_name) {
476 state.cancel_instance_creation(&db_instance_identifier);
477 return Err(AwsServiceError::aws_error(
478 StatusCode::NOT_FOUND,
479 "DBParameterGroupNotFound",
480 format!("DBParameterGroup {} not found.", pg_name),
481 ));
482 }
483 }
484 }
485
486 let runtime = self.require_runtime()?.clone();
487
488 let logical_db_name = db_name
489 .clone()
490 .unwrap_or_else(|| default_db_name(&engine).to_string());
491
492 let created_at = Utc::now();
498 let instance = {
499 let mut accounts = self.state.write();
500 let state = accounts.get_or_create(&request.account_id);
501 let placeholder = DbInstance {
502 db_instance_identifier: db_instance_identifier.clone(),
503 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
504 db_instance_class: db_instance_class.clone(),
505 engine: engine.clone(),
506 engine_version: engine_version.clone(),
507 db_instance_status: "creating".to_string(),
508 master_username: master_username.clone(),
509 db_name: db_name.clone(),
510 endpoint_address: String::new(),
511 port: 0,
512 allocated_storage,
513 publicly_accessible,
514 deletion_protection,
515 created_at,
516 dbi_resource_id: state.next_dbi_resource_id(),
517 master_user_password: master_user_password.clone(),
518 container_id: String::new(),
519 host_port: 0,
520 tags: Vec::new(),
521 read_replica_source_db_instance_identifier: None,
522 read_replica_db_instance_identifiers: Vec::new(),
523 vpc_security_group_ids,
524 db_parameter_group_name,
525 backup_retention_period,
526 preferred_backup_window,
527 latest_restorable_time: if backup_retention_period > 0 {
528 Some(created_at)
529 } else {
530 None
531 },
532 option_group_name,
533 multi_az,
534 pending_modified_values: None,
535 };
536 state.finish_instance_creation(placeholder.clone());
537 placeholder
538 };
539 let instance_arn = instance.db_instance_arn.clone();
540
541 self.emit_event(
542 RdsSourceType::DbInstance,
543 &db_instance_identifier,
544 &instance_arn,
545 "RDS-EVENT-0005",
546 &["creation"],
547 "DB instance created",
548 );
549
550 {
551 let state_handle = self.state.clone();
552 let delivery_bus = self.delivery_bus.clone();
553 let runtime = runtime.clone();
554 let id = db_instance_identifier.clone();
555 let engine = engine.clone();
556 let engine_version = engine_version.clone();
557 let master_username = master_username.clone();
558 let master_user_password = master_user_password.clone();
559 let account_id = request.account_id.clone();
560 let region = request.region.clone();
561 let arn = instance_arn.clone();
562 tokio::spawn(async move {
563 match runtime
564 .ensure_postgres(
565 &id,
566 &engine,
567 &engine_version,
568 &master_username,
569 &master_user_password,
570 &logical_db_name,
571 &account_id,
572 ®ion,
573 )
574 .await
575 {
576 Ok(running) => {
577 let mut accounts = state_handle.write();
578 let state = accounts.get_or_create(&account_id);
579 if let Some(inst) = state.instances.get_mut(&id) {
580 inst.db_instance_status = "available".to_string();
581 inst.endpoint_address = "127.0.0.1".to_string();
582 inst.port = i32::from(running.host_port);
583 inst.host_port = running.host_port;
584 inst.container_id = running.container_id;
585 }
586 }
587 Err(error) => {
588 tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
589 {
590 let mut accounts = state_handle.write();
591 let state = accounts.get_or_create(&account_id);
592 state.instances.remove(&id);
593 }
594 emit_event_static(
595 delivery_bus.as_ref(),
596 RdsSourceType::DbInstance,
597 &id,
598 &arn,
599 "RDS-EVENT-0058",
600 &["failure"],
601 &format!("DB instance failed to create: {}", error),
602 );
603 }
604 }
605 });
606 }
607
608 Ok(AwsResponse::xml(
609 StatusCode::OK,
610 query_response_xml(
611 "CreateDBInstance",
612 RDS_NS,
613 &format!(
614 "<DBInstance>{}</DBInstance>",
615 db_instance_xml(&instance, None)
616 ),
617 &request.request_id,
618 ),
619 ))
620 }
621
622 async fn delete_db_instance(
623 &self,
624 request: &AwsRequest,
625 ) -> Result<AwsResponse, AwsServiceError> {
626 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
627 let skip_final_snapshot =
628 parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
629 .unwrap_or(false);
630 let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
631
632 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
633 return Err(AwsServiceError::aws_error(
634 StatusCode::BAD_REQUEST,
635 "InvalidParameterCombination",
636 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
637 ));
638 }
639 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
640 return Err(AwsServiceError::aws_error(
641 StatusCode::BAD_REQUEST,
642 "InvalidParameterCombination",
643 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
644 ));
645 }
646
647 {
649 let accounts = self.state.read();
650 let empty = RdsState::new(&request.account_id, &request.region);
651 let state = accounts.get(&request.account_id).unwrap_or(&empty);
652 if let Some(instance) = state.instances.get(&db_instance_identifier) {
653 if instance.deletion_protection {
654 return Err(AwsServiceError::aws_error(
655 StatusCode::BAD_REQUEST,
656 "InvalidDBInstanceState",
657 format!(
658 "DBInstance {} cannot be deleted because deletion protection is enabled.",
659 db_instance_identifier
660 ),
661 ));
662 }
663 } else {
664 return Err(db_instance_not_found(&db_instance_identifier));
665 }
666 }
667
668 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
669 self.create_final_db_snapshot(
670 &db_instance_identifier,
671 snapshot_id,
672 &request.account_id,
673 &request.region,
674 )
675 .await?;
676 }
677
678 let instance = {
679 let mut accounts = self.state.write();
680 let state = accounts.get_or_create(&request.account_id);
681 let instance = state
682 .instances
683 .remove(&db_instance_identifier)
684 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
685
686 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
687 if let Some(source) = state.instances.get_mut(source_id) {
688 source
689 .read_replica_db_instance_identifiers
690 .retain(|id| id != &db_instance_identifier);
691 }
692 }
693
694 for replica_id in &instance.read_replica_db_instance_identifiers {
695 if let Some(replica) = state.instances.get_mut(replica_id) {
696 replica.read_replica_source_db_instance_identifier = None;
697 }
698 }
699
700 instance
701 };
702
703 if let Some(runtime) = &self.runtime {
704 runtime.stop_container(&db_instance_identifier).await;
705 }
706
707 self.emit_event(
708 RdsSourceType::DbInstance,
709 &db_instance_identifier,
710 &instance.db_instance_arn,
711 "RDS-EVENT-0003",
712 &["deletion"],
713 "DB instance deleted",
714 );
715
716 Ok(AwsResponse::xml(
717 StatusCode::OK,
718 query_response_xml(
719 "DeleteDBInstance",
720 RDS_NS,
721 &format!(
722 "<DBInstance>{}</DBInstance>",
723 db_instance_xml(&instance, Some("deleting"))
724 ),
725 &request.request_id,
726 ),
727 ))
728 }
729
730 async fn create_final_db_snapshot(
736 &self,
737 db_instance_identifier: &str,
738 snapshot_id: &str,
739 account_id: &str,
740 region: &str,
741 ) -> Result<(), AwsServiceError> {
742 let runtime = self.runtime.as_ref().ok_or_else(|| {
743 AwsServiceError::aws_error(
744 StatusCode::SERVICE_UNAVAILABLE,
745 "InvalidParameterValue",
746 "Docker/Podman is required for RDS snapshots but is not available",
747 )
748 })?;
749
750 let (instance_for_snapshot, db_name) = {
751 let accounts = self.state.read();
752 let empty = RdsState::new(account_id, region);
753 let state = accounts.get(account_id).unwrap_or(&empty);
754
755 if state.snapshots.contains_key(snapshot_id) {
756 return Err(AwsServiceError::aws_error(
757 StatusCode::CONFLICT,
758 "DBSnapshotAlreadyExists",
759 format!("DBSnapshot {snapshot_id} already exists."),
760 ));
761 }
762
763 let instance = state
764 .instances
765 .get(db_instance_identifier)
766 .cloned()
767 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
768
769 let default_db = default_db_name(&instance.engine);
770 let db_name = instance
771 .db_name
772 .as_deref()
773 .unwrap_or(default_db)
774 .to_string();
775
776 (instance, db_name)
777 };
778
779 let dump_data = runtime
780 .dump_database(
781 db_instance_identifier,
782 &instance_for_snapshot.engine,
783 &instance_for_snapshot.master_username,
784 &instance_for_snapshot.master_user_password,
785 &db_name,
786 )
787 .await
788 .map_err(runtime_error_to_service_error)?;
789
790 let mut accounts = self.state.write();
791 let state = accounts.get_or_create(account_id);
792
793 if state.snapshots.contains_key(snapshot_id) {
794 return Err(AwsServiceError::aws_error(
795 StatusCode::CONFLICT,
796 "DBSnapshotAlreadyExists",
797 format!("DBSnapshot {snapshot_id} already exists."),
798 ));
799 }
800
801 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
802
803 let snapshot = DbSnapshot {
804 db_snapshot_identifier: snapshot_id.to_string(),
805 db_snapshot_arn: snapshot_arn,
806 db_instance_identifier: db_instance_identifier.to_string(),
807 snapshot_create_time: Utc::now(),
808 engine: instance_for_snapshot.engine.clone(),
809 engine_version: instance_for_snapshot.engine_version.clone(),
810 allocated_storage: instance_for_snapshot.allocated_storage,
811 status: "available".to_string(),
812 port: instance_for_snapshot.port,
813 master_username: instance_for_snapshot.master_username.clone(),
814 db_name: instance_for_snapshot.db_name.clone(),
815 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
816 snapshot_type: "manual".to_string(),
817 master_user_password: instance_for_snapshot.master_user_password.clone(),
818 tags: Vec::new(),
819 dump_data,
820 };
821
822 state.snapshots.insert(snapshot_id.to_string(), snapshot);
823 Ok(())
824 }
825
826 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
827 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
828 let db_instance_class = optional_param(request, "DBInstanceClass");
829 let deletion_protection =
830 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
831 let apply_immediately =
832 parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
833
834 let vpc_security_group_ids = {
836 let mut ids = Vec::new();
837 for index in 1.. {
838 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
839 match optional_param(request, &sg_id_name) {
840 Some(sg_id) => ids.push(sg_id),
841 None => break,
842 }
843 }
844 if ids.is_empty() {
845 None
846 } else {
847 Some(ids)
848 }
849 };
850
851 if db_instance_class.is_none()
852 && deletion_protection.is_none()
853 && vpc_security_group_ids.is_none()
854 {
855 return Err(AwsServiceError::aws_error(
856 StatusCode::BAD_REQUEST,
857 "InvalidParameterCombination",
858 "At least one supported mutable field must be provided.",
859 ));
860 }
861 if let Some(ref class) = db_instance_class {
862 validate_db_instance_class(class)?;
863 }
864
865 let mut accounts = self.state.write();
866 let state = accounts.get_or_create(&request.account_id);
867 let instance = state
868 .instances
869 .get_mut(&db_instance_identifier)
870 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
871
872 if apply_immediately == Some(false) {
874 let pending = instance
875 .pending_modified_values
876 .get_or_insert(Default::default());
877 if let Some(class) = db_instance_class {
878 pending.db_instance_class = Some(class);
879 }
880 if let Some(deletion_protection) = deletion_protection {
883 instance.deletion_protection = deletion_protection;
884 }
885 if let Some(security_group_ids) = vpc_security_group_ids {
886 instance.vpc_security_group_ids = security_group_ids;
887 }
888 } else {
889 if let Some(class) = db_instance_class {
891 instance.db_instance_class = class;
892 }
893 if let Some(deletion_protection) = deletion_protection {
894 instance.deletion_protection = deletion_protection;
895 }
896 if let Some(security_group_ids) = vpc_security_group_ids {
897 instance.vpc_security_group_ids = security_group_ids;
898 }
899 }
900 let instance_arn = instance.db_instance_arn.clone();
901 let xml = query_response_xml(
902 "ModifyDBInstance",
903 RDS_NS,
904 &format!(
905 "<DBInstance>{}</DBInstance>",
906 db_instance_xml(instance, Some("modifying"))
907 ),
908 &request.request_id,
909 );
910 drop(accounts);
911
912 self.emit_event(
913 RdsSourceType::DbInstance,
914 &db_instance_identifier,
915 &instance_arn,
916 "RDS-EVENT-0014",
917 &["configuration change"],
918 "DB instance was modified",
919 );
920
921 Ok(AwsResponse::xml(StatusCode::OK, xml))
922 }
923
924 async fn reboot_db_instance(
925 &self,
926 request: &AwsRequest,
927 ) -> Result<AwsResponse, AwsServiceError> {
928 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
929 let force_failover =
930 parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
931 if force_failover == Some(true) {
932 return Err(AwsServiceError::aws_error(
933 StatusCode::BAD_REQUEST,
934 "InvalidParameterCombination",
935 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
936 ));
937 }
938
939 let instance = {
940 let accounts = self.state.read();
941 let empty = RdsState::new(&request.account_id, &request.region);
942 let state = accounts.get(&request.account_id).unwrap_or(&empty);
943 state
944 .instances
945 .get(&db_instance_identifier)
946 .cloned()
947 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
948 };
949
950 let runtime = self.require_runtime()?;
951
952 let running = runtime
953 .restart_container(
954 &db_instance_identifier,
955 &instance.engine,
956 &instance.master_username,
957 &instance.master_user_password,
958 instance
959 .db_name
960 .as_deref()
961 .unwrap_or(default_db_name(&instance.engine)),
962 )
963 .await
964 .map_err(runtime_error_to_service_error)?;
965
966 let instance = {
967 let mut accounts = self.state.write();
968 let state = accounts.get_or_create(&request.account_id);
969 let instance = state
970 .instances
971 .get_mut(&db_instance_identifier)
972 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
973 instance.host_port = running.host_port;
974 instance.port = i32::from(running.host_port);
975
976 if let Some(pending) = instance.pending_modified_values.take() {
978 if let Some(class) = pending.db_instance_class {
979 instance.db_instance_class = class;
980 }
981 if let Some(allocated_storage) = pending.allocated_storage {
982 instance.allocated_storage = allocated_storage;
983 }
984 if let Some(backup_retention_period) = pending.backup_retention_period {
985 instance.backup_retention_period = backup_retention_period;
986 }
987 if let Some(multi_az) = pending.multi_az {
988 instance.multi_az = multi_az;
989 }
990 if let Some(engine_version) = pending.engine_version {
991 instance.engine_version = engine_version;
992 }
993 if let Some(master_user_password) = pending.master_user_password {
994 instance.master_user_password = master_user_password;
995 }
996 }
997
998 instance.clone()
999 };
1000
1001 self.emit_event(
1002 RdsSourceType::DbInstance,
1003 &db_instance_identifier,
1004 &instance.db_instance_arn,
1005 "RDS-EVENT-0006",
1006 &["availability"],
1007 "DB instance restarted",
1008 );
1009
1010 Ok(AwsResponse::xml(
1011 StatusCode::OK,
1012 query_response_xml(
1013 "RebootDBInstance",
1014 RDS_NS,
1015 &format!(
1016 "<DBInstance>{}</DBInstance>",
1017 db_instance_xml(&instance, Some("rebooting"))
1018 ),
1019 &request.request_id,
1020 ),
1021 ))
1022 }
1023
1024 fn describe_db_engine_versions(
1025 &self,
1026 request: &AwsRequest,
1027 ) -> Result<AwsResponse, AwsServiceError> {
1028 let engine = optional_param(request, "Engine");
1029 let engine_version = optional_param(request, "EngineVersion");
1030 let family = optional_param(request, "DBParameterGroupFamily");
1031 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
1032
1033 let mut versions = filter_engine_versions(
1034 &default_engine_versions(),
1035 &engine,
1036 &engine_version,
1037 &family,
1038 );
1039
1040 if default_only.unwrap_or(false) {
1041 versions.truncate(1);
1042 }
1043
1044 Ok(AwsResponse::xml(
1045 StatusCode::OK,
1046 query_response_xml(
1047 "DescribeDBEngineVersions",
1048 RDS_NS,
1049 &format!(
1050 "<DBEngineVersions>{}</DBEngineVersions>",
1051 versions.iter().map(engine_version_xml).collect::<String>()
1052 ),
1053 &request.request_id,
1054 ),
1055 ))
1056 }
1057
1058 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1059 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1060 let marker = optional_param(request, "Marker");
1061 let max_records = optional_param(request, "MaxRecords");
1062
1063 let accounts = self.state.read();
1064 let empty = RdsState::new(&request.account_id, &request.region);
1065 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1066
1067 if let Some(identifier) = db_instance_identifier {
1069 let instance = state
1070 .instances
1071 .get(&identifier)
1072 .cloned()
1073 .ok_or_else(|| db_instance_not_found(&identifier))?;
1074
1075 return Ok(AwsResponse::xml(
1076 StatusCode::OK,
1077 query_response_xml(
1078 "DescribeDBInstances",
1079 RDS_NS,
1080 &format!(
1081 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1082 db_instance_xml(&instance, None)
1083 ),
1084 &request.request_id,
1085 ),
1086 ));
1087 }
1088
1089 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1091 instances.sort_by(|a, b| {
1092 a.created_at
1093 .cmp(&b.created_at)
1094 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1095 });
1096
1097 let paginated = paginate(instances, marker, max_records, |inst| {
1099 &inst.db_instance_identifier
1100 })?;
1101
1102 let marker_xml = paginated
1103 .next_marker
1104 .as_ref()
1105 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1106 .unwrap_or_default();
1107
1108 Ok(AwsResponse::xml(
1109 StatusCode::OK,
1110 query_response_xml(
1111 "DescribeDBInstances",
1112 RDS_NS,
1113 &format!(
1114 "<DBInstances>{}</DBInstances>{}",
1115 paginated
1116 .items
1117 .iter()
1118 .map(|instance| {
1119 format!(
1120 "<DBInstance>{}</DBInstance>",
1121 db_instance_xml(instance, None)
1122 )
1123 })
1124 .collect::<String>(),
1125 marker_xml
1126 ),
1127 &request.request_id,
1128 ),
1129 ))
1130 }
1131
1132 fn describe_orderable_db_instance_options(
1133 &self,
1134 request: &AwsRequest,
1135 ) -> Result<AwsResponse, AwsServiceError> {
1136 let engine = optional_param(request, "Engine");
1137 let engine_version = optional_param(request, "EngineVersion");
1138 let db_instance_class = optional_param(request, "DBInstanceClass");
1139 let license_model = optional_param(request, "LicenseModel");
1140 let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1141
1142 let options = filter_orderable_options(
1143 &default_orderable_options(),
1144 &engine,
1145 &engine_version,
1146 &db_instance_class,
1147 &license_model,
1148 vpc,
1149 );
1150
1151 Ok(AwsResponse::xml(
1152 StatusCode::OK,
1153 query_response_xml(
1154 "DescribeOrderableDBInstanceOptions",
1155 RDS_NS,
1156 &format!(
1157 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1158 options.iter().map(orderable_option_xml).collect::<String>()
1159 ),
1160 &request.request_id,
1161 ),
1162 ))
1163 }
1164
1165 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1166 let resource_name = required_param(request, "ResourceName")?;
1167 let tags = parse_tags(request)?;
1168
1169 if tags.is_empty() {
1170 return Err(AwsServiceError::aws_error(
1171 StatusCode::BAD_REQUEST,
1172 "MissingParameter",
1173 "The request must contain the parameter Tags.",
1174 ));
1175 }
1176
1177 let mut accounts = self.state.write();
1178 let state = accounts.get_or_create(&request.account_id);
1179 let instance = find_instance_by_arn_mut(state, &resource_name)?;
1180 merge_tags(&mut instance.tags, &tags);
1181
1182 Ok(AwsResponse::xml(
1183 StatusCode::OK,
1184 query_response_xml("AddTagsToResource", RDS_NS, "", &request.request_id),
1185 ))
1186 }
1187
1188 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1189 let resource_name = required_param(request, "ResourceName")?;
1190 if query_param_prefix_exists(request, "Filters.") {
1191 return Err(AwsServiceError::aws_error(
1192 StatusCode::BAD_REQUEST,
1193 "InvalidParameterValue",
1194 "Filters are not yet supported for ListTagsForResource.",
1195 ));
1196 }
1197
1198 let accounts = self.state.read();
1199 let empty = RdsState::new(&request.account_id, &request.region);
1200 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1201 let instance = find_instance_by_arn(state, &resource_name)?;
1202 let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1203
1204 Ok(AwsResponse::xml(
1205 StatusCode::OK,
1206 query_response_xml(
1207 "ListTagsForResource",
1208 RDS_NS,
1209 &format!("<TagList>{tag_xml}</TagList>"),
1210 &request.request_id,
1211 ),
1212 ))
1213 }
1214
1215 fn remove_tags_from_resource(
1216 &self,
1217 request: &AwsRequest,
1218 ) -> Result<AwsResponse, AwsServiceError> {
1219 let resource_name = required_param(request, "ResourceName")?;
1220 let tag_keys = parse_tag_keys(request)?;
1221
1222 if tag_keys.is_empty() {
1223 return Err(AwsServiceError::aws_error(
1224 StatusCode::BAD_REQUEST,
1225 "MissingParameter",
1226 "The request must contain the parameter TagKeys.",
1227 ));
1228 }
1229
1230 let mut accounts = self.state.write();
1231 let state = accounts.get_or_create(&request.account_id);
1232 let instance = find_instance_by_arn_mut(state, &resource_name)?;
1233 instance
1234 .tags
1235 .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1236
1237 Ok(AwsResponse::xml(
1238 StatusCode::OK,
1239 query_response_xml("RemoveTagsFromResource", RDS_NS, "", &request.request_id),
1240 ))
1241 }
1242
1243 async fn create_db_snapshot(
1244 &self,
1245 request: &AwsRequest,
1246 ) -> Result<AwsResponse, AwsServiceError> {
1247 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1248 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1249
1250 let runtime = self.runtime.as_ref().ok_or_else(|| {
1251 AwsServiceError::aws_error(
1252 StatusCode::SERVICE_UNAVAILABLE,
1253 "InvalidParameterValue",
1254 "Docker/Podman is required for RDS snapshots but is not available",
1255 )
1256 })?;
1257
1258 let (instance, db_name) = {
1259 let accounts = self.state.read();
1260 let empty = RdsState::new(&request.account_id, &request.region);
1261 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1262
1263 if state.snapshots.contains_key(&db_snapshot_identifier) {
1264 return Err(AwsServiceError::aws_error(
1265 StatusCode::CONFLICT,
1266 "DBSnapshotAlreadyExists",
1267 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1268 ));
1269 }
1270
1271 let instance = state
1272 .instances
1273 .get(&db_instance_identifier)
1274 .cloned()
1275 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1276
1277 let default_db = default_db_name(&instance.engine);
1278 let db_name = instance
1279 .db_name
1280 .as_deref()
1281 .unwrap_or(default_db)
1282 .to_string();
1283
1284 (instance, db_name)
1285 };
1286
1287 let dump_data = runtime
1288 .dump_database(
1289 &db_instance_identifier,
1290 &instance.engine,
1291 &instance.master_username,
1292 &instance.master_user_password,
1293 &db_name,
1294 )
1295 .await
1296 .map_err(runtime_error_to_service_error)?;
1297
1298 let mut accounts = self.state.write();
1299 let state = accounts.get_or_create(&request.account_id);
1300
1301 if state.snapshots.contains_key(&db_snapshot_identifier) {
1302 return Err(AwsServiceError::aws_error(
1303 StatusCode::CONFLICT,
1304 "DBSnapshotAlreadyExists",
1305 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1306 ));
1307 }
1308
1309 let snapshot = DbSnapshot {
1310 db_snapshot_identifier: db_snapshot_identifier.clone(),
1311 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1312 db_instance_identifier: instance.db_instance_identifier.clone(),
1313 snapshot_create_time: Utc::now(),
1314 engine: instance.engine.clone(),
1315 engine_version: instance.engine_version.clone(),
1316 allocated_storage: instance.allocated_storage,
1317 status: "available".to_string(),
1318 port: instance.port,
1319 master_username: instance.master_username.clone(),
1320 db_name: instance.db_name.clone(),
1321 dbi_resource_id: instance.dbi_resource_id.clone(),
1322 snapshot_type: "manual".to_string(),
1323 master_user_password: instance.master_user_password.clone(),
1324 tags: Vec::new(),
1325 dump_data,
1326 };
1327
1328 state
1329 .snapshots
1330 .insert(db_snapshot_identifier.clone(), snapshot.clone());
1331 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1332 drop(accounts);
1333
1334 self.emit_event(
1335 RdsSourceType::DbSnapshot,
1336 &db_snapshot_identifier,
1337 &snapshot_arn,
1338 "RDS-EVENT-0042",
1339 &["creation"],
1340 "Manual snapshot created",
1341 );
1342
1343 Ok(AwsResponse::xml(
1344 StatusCode::OK,
1345 query_response_xml(
1346 "CreateDBSnapshot",
1347 RDS_NS,
1348 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1349 &request.request_id,
1350 ),
1351 ))
1352 }
1353
1354 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1355 let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1356 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1357 let marker = optional_param(request, "Marker");
1358 let max_records = optional_param(request, "MaxRecords");
1359
1360 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1361 return Err(AwsServiceError::aws_error(
1362 StatusCode::BAD_REQUEST,
1363 "InvalidParameterCombination",
1364 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1365 ));
1366 }
1367
1368 let accounts = self.state.read();
1369 let empty = RdsState::new(&request.account_id, &request.region);
1370 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1371
1372 if let Some(snapshot_id) = db_snapshot_identifier {
1374 let snapshot = state
1375 .snapshots
1376 .get(&snapshot_id)
1377 .cloned()
1378 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1379
1380 return Ok(AwsResponse::xml(
1381 StatusCode::OK,
1382 query_response_xml(
1383 "DescribeDBSnapshots",
1384 RDS_NS,
1385 &format!(
1386 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1387 db_snapshot_xml(&snapshot)
1388 ),
1389 &request.request_id,
1390 ),
1391 ));
1392 }
1393
1394 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1396 state
1397 .snapshots
1398 .values()
1399 .filter(|s| s.db_instance_identifier == instance_id)
1400 .cloned()
1401 .collect()
1402 } else {
1403 state.snapshots.values().cloned().collect()
1404 };
1405
1406 snapshots.sort_by(|a, b| {
1408 a.snapshot_create_time
1409 .cmp(&b.snapshot_create_time)
1410 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1411 });
1412
1413 let paginated = paginate(snapshots, marker, max_records, |snap| {
1415 &snap.db_snapshot_identifier
1416 })?;
1417
1418 let marker_xml = paginated
1419 .next_marker
1420 .as_ref()
1421 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1422 .unwrap_or_default();
1423
1424 Ok(AwsResponse::xml(
1425 StatusCode::OK,
1426 query_response_xml(
1427 "DescribeDBSnapshots",
1428 RDS_NS,
1429 &format!(
1430 "<DBSnapshots>{}</DBSnapshots>{}",
1431 paginated
1432 .items
1433 .iter()
1434 .map(|snapshot| format!(
1435 "<DBSnapshot>{}</DBSnapshot>",
1436 db_snapshot_xml(snapshot)
1437 ))
1438 .collect::<String>(),
1439 marker_xml
1440 ),
1441 &request.request_id,
1442 ),
1443 ))
1444 }
1445
1446 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1447 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1448
1449 let mut accounts = self.state.write();
1450 let state = accounts.get_or_create(&request.account_id);
1451
1452 let snapshot = state
1453 .snapshots
1454 .remove(&db_snapshot_identifier)
1455 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1456 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1457 drop(accounts);
1458
1459 self.emit_event(
1460 RdsSourceType::DbSnapshot,
1461 &db_snapshot_identifier,
1462 &snapshot_arn,
1463 "RDS-EVENT-0041",
1464 &["deletion"],
1465 "Manual snapshot deleted",
1466 );
1467
1468 Ok(AwsResponse::xml(
1469 StatusCode::OK,
1470 query_response_xml(
1471 "DeleteDBSnapshot",
1472 RDS_NS,
1473 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1474 &request.request_id,
1475 ),
1476 ))
1477 }
1478
1479 async fn restore_db_instance_from_db_snapshot(
1480 &self,
1481 request: &AwsRequest,
1482 ) -> Result<AwsResponse, AwsServiceError> {
1483 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1484 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1485 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1486
1487 let runtime = self.require_runtime()?;
1488
1489 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1490 let mut accounts = self.state.write();
1491 let state = accounts.get_or_create(&request.account_id);
1492
1493 if !state.begin_instance_creation(&db_instance_identifier) {
1494 return Err(AwsServiceError::aws_error(
1495 StatusCode::CONFLICT,
1496 "DBInstanceAlreadyExists",
1497 format!("DBInstance {db_instance_identifier} already exists."),
1498 ));
1499 }
1500
1501 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1502 Some(s) => s,
1503 None => {
1504 state.cancel_instance_creation(&db_instance_identifier);
1505 return Err(db_snapshot_not_found(&db_snapshot_identifier));
1506 }
1507 };
1508
1509 let dbi_resource_id = state.next_dbi_resource_id();
1510 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1511 let created_at = Utc::now();
1512
1513 (snapshot, dbi_resource_id, db_instance_arn, created_at)
1514 };
1515
1516 let db_name = snapshot
1517 .db_name
1518 .as_deref()
1519 .unwrap_or(default_db_name(&snapshot.engine));
1520 let running = match runtime
1521 .ensure_postgres(
1522 &db_instance_identifier,
1523 &snapshot.engine,
1524 &snapshot.engine_version,
1525 &snapshot.master_username,
1526 &snapshot.master_user_password,
1527 db_name,
1528 &request.account_id,
1529 &request.region,
1530 )
1531 .await
1532 {
1533 Ok(running) => running,
1534 Err(e) => {
1535 self.state
1536 .write()
1537 .get_or_create(&request.account_id)
1538 .cancel_instance_creation(&db_instance_identifier);
1539 return Err(runtime_error_to_service_error(e));
1540 }
1541 };
1542
1543 if let Err(e) = runtime
1544 .restore_database(
1545 &db_instance_identifier,
1546 &snapshot.engine,
1547 &snapshot.master_username,
1548 &snapshot.master_user_password,
1549 db_name,
1550 &snapshot.dump_data,
1551 )
1552 .await
1553 {
1554 self.state
1555 .write()
1556 .get_or_create(&request.account_id)
1557 .cancel_instance_creation(&db_instance_identifier);
1558 runtime.stop_container(&db_instance_identifier).await;
1559 return Err(runtime_error_to_service_error(e));
1560 }
1561
1562 let instance = build_restored_instance(
1563 &db_instance_identifier,
1564 db_instance_arn,
1565 dbi_resource_id,
1566 created_at,
1567 vpc_security_group_ids,
1568 &snapshot,
1569 &running,
1570 );
1571
1572 self.state
1573 .write()
1574 .get_or_create(&request.account_id)
1575 .finish_instance_creation(instance.clone());
1576
1577 self.emit_event(
1578 RdsSourceType::DbInstance,
1579 &db_instance_identifier,
1580 &instance.db_instance_arn,
1581 "RDS-EVENT-0043",
1582 &["creation"],
1583 "DB instance restored from snapshot",
1584 );
1585
1586 Ok(AwsResponse::xml(
1587 StatusCode::OK,
1588 query_response_xml(
1589 "RestoreDBInstanceFromDBSnapshot",
1590 RDS_NS,
1591 &format!(
1592 "<DBInstance>{}</DBInstance>",
1593 db_instance_xml(&instance, None)
1594 ),
1595 &request.request_id,
1596 ),
1597 ))
1598 }
1599
1600 async fn create_db_instance_read_replica(
1601 &self,
1602 request: &AwsRequest,
1603 ) -> Result<AwsResponse, AwsServiceError> {
1604 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1605 let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1606
1607 let runtime = self.runtime.as_ref().ok_or_else(|| {
1608 AwsServiceError::aws_error(
1609 StatusCode::SERVICE_UNAVAILABLE,
1610 "InvalidParameterValue",
1611 "Docker/Podman is required for RDS read replicas but is not available",
1612 )
1613 })?;
1614
1615 let (source_instance, db_name) = {
1616 let mut accounts = self.state.write();
1617 let state = accounts.get_or_create(&request.account_id);
1618
1619 if !state.begin_instance_creation(&db_instance_identifier) {
1620 return Err(AwsServiceError::aws_error(
1621 StatusCode::CONFLICT,
1622 "DBInstanceAlreadyExists",
1623 format!("DBInstance {db_instance_identifier} already exists."),
1624 ));
1625 }
1626
1627 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1628 {
1629 Some(inst) => inst,
1630 None => {
1631 state.cancel_instance_creation(&db_instance_identifier);
1632 return Err(db_instance_not_found(&source_db_instance_identifier));
1633 }
1634 };
1635
1636 let default_db = default_db_name(&source_instance.engine);
1637 let db_name = source_instance
1638 .db_name
1639 .as_deref()
1640 .unwrap_or(default_db)
1641 .to_string();
1642
1643 (source_instance, db_name)
1644 };
1645
1646 let dump_data = match runtime
1647 .dump_database(
1648 &source_db_instance_identifier,
1649 &source_instance.engine,
1650 &source_instance.master_username,
1651 &source_instance.master_user_password,
1652 &db_name,
1653 )
1654 .await
1655 {
1656 Ok(data) => data,
1657 Err(e) => {
1658 self.state
1659 .write()
1660 .get_or_create(&request.account_id)
1661 .cancel_instance_creation(&db_instance_identifier);
1662 return Err(runtime_error_to_service_error(e));
1663 }
1664 };
1665
1666 let (dbi_resource_id, db_instance_arn) = {
1667 let accounts = self.state.read();
1668 let empty = RdsState::new(&request.account_id, &request.region);
1669 let s = accounts.get(&request.account_id).unwrap_or(&empty);
1670 (
1671 s.next_dbi_resource_id(),
1672 s.db_instance_arn(&db_instance_identifier),
1673 )
1674 };
1675 let created_at = Utc::now();
1676
1677 let running = match runtime
1678 .ensure_postgres(
1679 &db_instance_identifier,
1680 &source_instance.engine,
1681 &source_instance.engine_version,
1682 &source_instance.master_username,
1683 &source_instance.master_user_password,
1684 &db_name,
1685 &request.account_id,
1686 &request.region,
1687 )
1688 .await
1689 {
1690 Ok(running) => running,
1691 Err(e) => {
1692 self.state
1693 .write()
1694 .get_or_create(&request.account_id)
1695 .cancel_instance_creation(&db_instance_identifier);
1696 return Err(runtime_error_to_service_error(e));
1697 }
1698 };
1699
1700 if let Err(e) = runtime
1701 .restore_database(
1702 &db_instance_identifier,
1703 &source_instance.engine,
1704 &source_instance.master_username,
1705 &source_instance.master_user_password,
1706 &db_name,
1707 &dump_data,
1708 )
1709 .await
1710 {
1711 self.state
1712 .write()
1713 .get_or_create(&request.account_id)
1714 .cancel_instance_creation(&db_instance_identifier);
1715 runtime.stop_container(&db_instance_identifier).await;
1716 return Err(runtime_error_to_service_error(e));
1717 }
1718
1719 let replica = build_read_replica_instance(
1720 &db_instance_identifier,
1721 db_instance_arn,
1722 dbi_resource_id,
1723 created_at,
1724 &source_db_instance_identifier,
1725 &source_instance,
1726 &running,
1727 );
1728
1729 let source_missing = {
1730 let mut accounts = self.state.write();
1731 let state = accounts.get_or_create(&request.account_id);
1732 match state.instances.get_mut(&source_db_instance_identifier) {
1733 Some(source) => {
1734 source
1735 .read_replica_db_instance_identifiers
1736 .push(db_instance_identifier.clone());
1737 state.finish_instance_creation(replica.clone());
1738 false
1739 }
1740 None => {
1741 state.cancel_instance_creation(&db_instance_identifier);
1742 true
1743 }
1744 }
1745 };
1746
1747 if source_missing {
1748 runtime.stop_container(&db_instance_identifier).await;
1749 return Err(db_instance_not_found(&source_db_instance_identifier));
1750 }
1751
1752 self.emit_event(
1753 RdsSourceType::DbInstance,
1754 &db_instance_identifier,
1755 &replica.db_instance_arn,
1756 "RDS-EVENT-0005",
1757 &["creation", "read replica"],
1758 "Read replica DB instance created",
1759 );
1760
1761 Ok(AwsResponse::xml(
1762 StatusCode::OK,
1763 query_response_xml(
1764 "CreateDBInstanceReadReplica",
1765 RDS_NS,
1766 &format!(
1767 "<DBInstance>{}</DBInstance>",
1768 db_instance_xml(&replica, None)
1769 ),
1770 &request.request_id,
1771 ),
1772 ))
1773 }
1774
1775 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1776 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1777 let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1778 let subnet_ids = parse_subnet_ids(request)?;
1779
1780 if subnet_ids.is_empty() {
1781 return Err(AwsServiceError::aws_error(
1782 StatusCode::BAD_REQUEST,
1783 "InvalidParameterValue",
1784 "At least one subnet must be specified.",
1785 ));
1786 }
1787
1788 if subnet_ids.len() < 2 {
1789 return Err(AwsServiceError::aws_error(
1790 StatusCode::BAD_REQUEST,
1791 "DBSubnetGroupDoesNotCoverEnoughAZs",
1792 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1793 ));
1794 }
1795
1796 let mut accounts = self.state.write();
1797 let state = accounts.get_or_create(&request.account_id);
1798
1799 if state.subnet_groups.contains_key(&db_subnet_group_name) {
1800 return Err(AwsServiceError::aws_error(
1801 StatusCode::CONFLICT,
1802 "DBSubnetGroupAlreadyExists",
1803 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1804 ));
1805 }
1806
1807 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1808 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1809 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1810 .collect();
1811
1812 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1814 if unique_azs.len() < 2 {
1815 return Err(AwsServiceError::aws_error(
1816 StatusCode::BAD_REQUEST,
1817 "DBSubnetGroupDoesNotCoverEnoughAZs",
1818 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1819 ));
1820 }
1821
1822 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1823 let tags = parse_tags(request)?;
1824
1825 let subnet_group = DbSubnetGroup {
1826 db_subnet_group_name: db_subnet_group_name.clone(),
1827 db_subnet_group_arn,
1828 db_subnet_group_description,
1829 vpc_id,
1830 subnet_ids,
1831 subnet_availability_zones,
1832 tags,
1833 };
1834
1835 state
1836 .subnet_groups
1837 .insert(db_subnet_group_name, subnet_group.clone());
1838
1839 Ok(AwsResponse::xml(
1840 StatusCode::OK,
1841 query_response_xml(
1842 "CreateDBSubnetGroup",
1843 RDS_NS,
1844 &format!(
1845 "<DBSubnetGroup>{}</DBSubnetGroup>",
1846 db_subnet_group_xml(&subnet_group)
1847 ),
1848 &request.request_id,
1849 ),
1850 ))
1851 }
1852
1853 fn describe_db_subnet_groups(
1854 &self,
1855 request: &AwsRequest,
1856 ) -> Result<AwsResponse, AwsServiceError> {
1857 let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1858 let marker = optional_param(request, "Marker");
1859 let max_records = optional_param(request, "MaxRecords");
1860
1861 let accounts = self.state.read();
1862 let empty = RdsState::new(&request.account_id, &request.region);
1863 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1864
1865 if let Some(name) = db_subnet_group_name {
1867 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1868 AwsServiceError::aws_error(
1869 StatusCode::NOT_FOUND,
1870 "DBSubnetGroupNotFoundFault",
1871 format!("DBSubnetGroup {} not found.", name),
1872 )
1873 })?;
1874
1875 return Ok(AwsResponse::xml(
1876 StatusCode::OK,
1877 query_response_xml(
1878 "DescribeDBSubnetGroups",
1879 RDS_NS,
1880 &format!(
1881 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1882 db_subnet_group_xml(sg)
1883 ),
1884 &request.request_id,
1885 ),
1886 ));
1887 }
1888
1889 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1891 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1892
1893 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1895 &sg.db_subnet_group_name
1896 })?;
1897
1898 let marker_xml = paginated
1899 .next_marker
1900 .as_ref()
1901 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1902 .unwrap_or_default();
1903
1904 let body = paginated
1905 .items
1906 .iter()
1907 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1908 .collect::<Vec<_>>()
1909 .join("");
1910
1911 Ok(AwsResponse::xml(
1912 StatusCode::OK,
1913 query_response_xml(
1914 "DescribeDBSubnetGroups",
1915 RDS_NS,
1916 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1917 &request.request_id,
1918 ),
1919 ))
1920 }
1921
1922 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1923 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1924
1925 let mut accounts = self.state.write();
1926 let state = accounts.get_or_create(&request.account_id);
1927
1928 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1929 return Err(AwsServiceError::aws_error(
1930 StatusCode::NOT_FOUND,
1931 "DBSubnetGroupNotFoundFault",
1932 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1933 ));
1934 }
1935
1936 Ok(AwsResponse::xml(
1937 StatusCode::OK,
1938 query_response_xml("DeleteDBSubnetGroup", RDS_NS, "", &request.request_id),
1939 ))
1940 }
1941
1942 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1943 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1944 let subnet_ids = parse_subnet_ids(request)?;
1945
1946 if subnet_ids.is_empty() {
1947 return Err(AwsServiceError::aws_error(
1948 StatusCode::BAD_REQUEST,
1949 "InvalidParameterValue",
1950 "At least one subnet must be specified.",
1951 ));
1952 }
1953
1954 if subnet_ids.len() < 2 {
1955 return Err(AwsServiceError::aws_error(
1956 StatusCode::BAD_REQUEST,
1957 "DBSubnetGroupDoesNotCoverEnoughAZs",
1958 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1959 ));
1960 }
1961
1962 let mut accounts = self.state.write();
1963 let state = accounts.get_or_create(&request.account_id);
1964
1965 let region = state.region.clone();
1966
1967 let subnet_group = state
1968 .subnet_groups
1969 .get_mut(&db_subnet_group_name)
1970 .ok_or_else(|| {
1971 AwsServiceError::aws_error(
1972 StatusCode::NOT_FOUND,
1973 "DBSubnetGroupNotFoundFault",
1974 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1975 )
1976 })?;
1977
1978 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1979 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
1980 .collect();
1981
1982 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1984 if unique_azs.len() < 2 {
1985 return Err(AwsServiceError::aws_error(
1986 StatusCode::BAD_REQUEST,
1987 "DBSubnetGroupDoesNotCoverEnoughAZs",
1988 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1989 ));
1990 }
1991
1992 subnet_group.subnet_ids = subnet_ids;
1993 subnet_group.subnet_availability_zones = subnet_availability_zones;
1994
1995 let subnet_group_clone = subnet_group.clone();
1996
1997 Ok(AwsResponse::xml(
1998 StatusCode::OK,
1999 query_response_xml(
2000 "ModifyDBSubnetGroup",
2001 RDS_NS,
2002 &format!(
2003 "<DBSubnetGroup>{}</DBSubnetGroup>",
2004 db_subnet_group_xml(&subnet_group_clone)
2005 ),
2006 &request.request_id,
2007 ),
2008 ))
2009 }
2010
2011 fn create_db_parameter_group(
2012 &self,
2013 request: &AwsRequest,
2014 ) -> Result<AwsResponse, AwsServiceError> {
2015 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2016 let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
2017 let description = required_param(request, "Description")?;
2018
2019 let valid_families = [
2021 "postgres16",
2022 "postgres15",
2023 "postgres14",
2024 "postgres13",
2025 "mysql8.0",
2026 "mysql5.7",
2027 "mariadb10.11",
2028 "mariadb10.6",
2029 ];
2030
2031 if !valid_families.contains(&db_parameter_group_family.as_str()) {
2032 return Err(AwsServiceError::aws_error(
2033 StatusCode::BAD_REQUEST,
2034 "InvalidParameterValue",
2035 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
2036 ));
2037 }
2038
2039 let mut accounts = self.state.write();
2040 let state = accounts.get_or_create(&request.account_id);
2041
2042 if state
2043 .parameter_groups
2044 .contains_key(&db_parameter_group_name)
2045 {
2046 return Err(AwsServiceError::aws_error(
2047 StatusCode::CONFLICT,
2048 "DBParameterGroupAlreadyExists",
2049 format!("DBParameterGroup {db_parameter_group_name} already exists."),
2050 ));
2051 }
2052
2053 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
2054 let tags = parse_tags(request)?;
2055
2056 let parameter_group = DbParameterGroup {
2057 db_parameter_group_name: db_parameter_group_name.clone(),
2058 db_parameter_group_arn,
2059 db_parameter_group_family,
2060 description,
2061 parameters: std::collections::BTreeMap::new(),
2062 tags,
2063 };
2064
2065 state
2066 .parameter_groups
2067 .insert(db_parameter_group_name, parameter_group.clone());
2068
2069 Ok(AwsResponse::xml(
2070 StatusCode::OK,
2071 query_response_xml(
2072 "CreateDBParameterGroup",
2073 RDS_NS,
2074 &format!(
2075 "<DBParameterGroup>{}</DBParameterGroup>",
2076 db_parameter_group_xml(¶meter_group)
2077 ),
2078 &request.request_id,
2079 ),
2080 ))
2081 }
2082
2083 fn describe_db_parameter_groups(
2084 &self,
2085 request: &AwsRequest,
2086 ) -> Result<AwsResponse, AwsServiceError> {
2087 let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2088 let marker = optional_param(request, "Marker");
2089 let max_records = optional_param(request, "MaxRecords");
2090
2091 let accounts = self.state.read();
2092 let empty = RdsState::new(&request.account_id, &request.region);
2093 let state = accounts.get(&request.account_id).unwrap_or(&empty);
2094
2095 if let Some(name) = db_parameter_group_name {
2097 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2098 AwsServiceError::aws_error(
2099 StatusCode::NOT_FOUND,
2100 "DBParameterGroupNotFound",
2101 format!("DBParameterGroup {} not found.", name),
2102 )
2103 })?;
2104
2105 return Ok(AwsResponse::xml(
2106 StatusCode::OK,
2107 query_response_xml(
2108 "DescribeDBParameterGroups", RDS_NS,
2109 &format!(
2110 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2111 db_parameter_group_xml(pg)
2112 ),
2113 &request.request_id,
2114 ),
2115 ));
2116 }
2117
2118 let mut parameter_groups: Vec<DbParameterGroup> =
2120 state.parameter_groups.values().cloned().collect();
2121 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2122
2123 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2125 &pg.db_parameter_group_name
2126 })?;
2127
2128 let marker_xml = paginated
2129 .next_marker
2130 .as_ref()
2131 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2132 .unwrap_or_default();
2133
2134 let body = paginated
2135 .items
2136 .iter()
2137 .map(|pg| {
2138 format!(
2139 "<DBParameterGroup>{}</DBParameterGroup>",
2140 db_parameter_group_xml(pg)
2141 )
2142 })
2143 .collect::<Vec<_>>()
2144 .join("");
2145
2146 Ok(AwsResponse::xml(
2147 StatusCode::OK,
2148 query_response_xml(
2149 "DescribeDBParameterGroups",
2150 RDS_NS,
2151 &format!(
2152 "<DBParameterGroups>{}</DBParameterGroups>{}",
2153 body, marker_xml
2154 ),
2155 &request.request_id,
2156 ),
2157 ))
2158 }
2159
2160 fn delete_db_parameter_group(
2161 &self,
2162 request: &AwsRequest,
2163 ) -> Result<AwsResponse, AwsServiceError> {
2164 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2165
2166 let mut accounts = self.state.write();
2167 let state = accounts.get_or_create(&request.account_id);
2168
2169 if db_parameter_group_name.starts_with("default.") {
2170 return Err(AwsServiceError::aws_error(
2171 StatusCode::BAD_REQUEST,
2172 "InvalidParameterValue",
2173 "Cannot delete default parameter groups.",
2174 ));
2175 }
2176
2177 if state
2178 .parameter_groups
2179 .remove(&db_parameter_group_name)
2180 .is_none()
2181 {
2182 return Err(AwsServiceError::aws_error(
2183 StatusCode::NOT_FOUND,
2184 "DBParameterGroupNotFound",
2185 format!("DBParameterGroup {db_parameter_group_name} not found."),
2186 ));
2187 }
2188
2189 Ok(AwsResponse::xml(
2190 StatusCode::OK,
2191 query_response_xml("DeleteDBParameterGroup", RDS_NS, "", &request.request_id),
2192 ))
2193 }
2194
2195 fn modify_db_parameter_group(
2196 &self,
2197 request: &AwsRequest,
2198 ) -> Result<AwsResponse, AwsServiceError> {
2199 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2200
2201 let mut accounts = self.state.write();
2202 let state = accounts.get_or_create(&request.account_id);
2203
2204 let parameter_group = state
2205 .parameter_groups
2206 .get_mut(&db_parameter_group_name)
2207 .ok_or_else(|| {
2208 AwsServiceError::aws_error(
2209 StatusCode::NOT_FOUND,
2210 "DBParameterGroupNotFound",
2211 format!("DBParameterGroup {db_parameter_group_name} not found."),
2212 )
2213 })?;
2214
2215 if let Some(new_description) = optional_param(request, "Description") {
2216 parameter_group.description = new_description;
2217 }
2218
2219 let parameter_group_clone = parameter_group.clone();
2220
2221 Ok(AwsResponse::xml(
2222 StatusCode::OK,
2223 query_response_xml(
2224 "ModifyDBParameterGroup",
2225 RDS_NS,
2226 &format!(
2227 "<DBParameterGroupName>{}</DBParameterGroupName>",
2228 xml_escape(¶meter_group_clone.db_parameter_group_name)
2229 ),
2230 &request.request_id,
2231 ),
2232 ))
2233 }
2234}
2235
2236fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2237 fakecloud_core::query::optional_query_param(req, name)
2238}
2239
2240fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2241 fakecloud_core::query::required_query_param(req, name)
2242}
2243
2244fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2245 let value = required_param(req, name)?;
2246 value.parse::<i32>().map_err(|_| {
2247 AwsServiceError::aws_error(
2248 StatusCode::BAD_REQUEST,
2249 "InvalidParameterValue",
2250 format!("Parameter {name} must be a valid integer."),
2251 )
2252 })
2253}
2254
2255fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2256 optional_param(req, name)
2257 .map(|value| {
2258 value.parse::<i32>().map_err(|_| {
2259 AwsServiceError::aws_error(
2260 StatusCode::BAD_REQUEST,
2261 "InvalidParameterValue",
2262 format!("Parameter {name} must be a valid integer."),
2263 )
2264 })
2265 })
2266 .transpose()
2267}
2268
2269fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2270 let mut tags = Vec::new();
2271 for index in 1.. {
2272 let key_name = format!("Tags.Tag.{index}.Key");
2273 let value_name = format!("Tags.Tag.{index}.Value");
2274 let key = optional_param(req, &key_name);
2275 let value = optional_param(req, &value_name);
2276
2277 match (key, value) {
2278 (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2279 (None, None) => break,
2280 _ => {
2281 return Err(AwsServiceError::aws_error(
2282 StatusCode::BAD_REQUEST,
2283 "InvalidParameterValue",
2284 "Each tag must include both Key and Value.",
2285 ));
2286 }
2287 }
2288 }
2289
2290 Ok(tags)
2291}
2292
2293fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2294 let mut keys = Vec::new();
2295 for index in 1.. {
2296 let key_name = format!("TagKeys.member.{index}");
2297 match optional_param(req, &key_name) {
2298 Some(key) => keys.push(key),
2299 None => break,
2300 }
2301 }
2302
2303 Ok(keys)
2304}
2305
2306fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2307 let mut subnet_ids = Vec::new();
2308 for index in 1.. {
2309 let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2310 match optional_param(req, &subnet_id_name) {
2311 Some(subnet_id) => subnet_ids.push(subnet_id),
2312 None => break,
2313 }
2314 }
2315
2316 Ok(subnet_ids)
2317}
2318
2319fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2320 let mut security_group_ids = Vec::new();
2321 for index in 1.. {
2322 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2323 match optional_param(req, &sg_id_name) {
2324 Some(sg_id) => security_group_ids.push(sg_id),
2325 None => break,
2326 }
2327 }
2328
2329 if security_group_ids.is_empty() {
2331 security_group_ids.push("sg-default".to_string());
2332 }
2333
2334 security_group_ids
2335}
2336
2337fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2338 req.query_params.keys().any(|key| key.starts_with(prefix))
2339}
2340
2341fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2342 value
2343 .map(|raw| match raw {
2344 "true" | "True" | "TRUE" => Ok(true),
2345 "false" | "False" | "FALSE" => Ok(false),
2346 _ => Err(AwsServiceError::aws_error(
2347 StatusCode::BAD_REQUEST,
2348 "InvalidParameterValue",
2349 format!("Boolean parameter value '{raw}' is invalid."),
2350 )),
2351 })
2352 .transpose()
2353}
2354
2355struct PaginationResult<T> {
2356 items: Vec<T>,
2357 next_marker: Option<String>,
2358}
2359
2360fn paginate<T, F>(
2361 mut items: Vec<T>,
2362 marker: Option<String>,
2363 max_records: Option<String>,
2364 get_id: F,
2365) -> Result<PaginationResult<T>, AwsServiceError>
2366where
2367 F: Fn(&T) -> &str,
2368{
2369 let max = if let Some(max_str) = max_records {
2371 let parsed = max_str.parse::<i32>().map_err(|_| {
2372 AwsServiceError::aws_error(
2373 StatusCode::BAD_REQUEST,
2374 "InvalidParameterValue",
2375 "MaxRecords must be a valid integer.",
2376 )
2377 })?;
2378 if !(1..=100).contains(&parsed) {
2379 return Err(AwsServiceError::aws_error(
2380 StatusCode::BAD_REQUEST,
2381 "InvalidParameterValue",
2382 "MaxRecords must be between 1 and 100.",
2383 ));
2384 }
2385 parsed as usize
2386 } else {
2387 100
2388 };
2389
2390 let start_id = if let Some(encoded_marker) = marker {
2392 let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2393 AwsServiceError::aws_error(
2394 StatusCode::BAD_REQUEST,
2395 "InvalidParameterValue",
2396 "Marker is invalid.",
2397 )
2398 })?;
2399 let id = String::from_utf8(decoded).map_err(|_| {
2400 AwsServiceError::aws_error(
2401 StatusCode::BAD_REQUEST,
2402 "InvalidParameterValue",
2403 "Marker is invalid.",
2404 )
2405 })?;
2406 Some(id)
2407 } else {
2408 None
2409 };
2410
2411 let start_index = if let Some(ref start_id) = start_id {
2413 items
2414 .iter()
2415 .position(|item| get_id(item) == start_id)
2416 .map(|pos| pos + 1) .unwrap_or(items.len()) } else {
2419 0
2420 };
2421
2422 let total_items = items.len();
2424 let end_index = std::cmp::min(start_index + max, total_items);
2425 let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2426
2427 let next_marker = if end_index < total_items {
2429 paginated_items
2430 .last()
2431 .map(|item| BASE64.encode(get_id(item).as_bytes()))
2432 } else {
2433 None
2434 };
2435
2436 Ok(PaginationResult {
2437 items: paginated_items,
2438 next_marker,
2439 })
2440}
2441
2442fn validate_create_request(
2443 db_instance_identifier: &str,
2444 allocated_storage: i32,
2445 db_instance_class: &str,
2446 engine: &str,
2447 engine_version: &str,
2448 port: i32,
2449) -> Result<(), AwsServiceError> {
2450 if allocated_storage <= 0 {
2451 return Err(AwsServiceError::aws_error(
2452 StatusCode::BAD_REQUEST,
2453 "InvalidParameterValue",
2454 "AllocatedStorage must be greater than zero.",
2455 ));
2456 }
2457 if port <= 0 {
2458 return Err(AwsServiceError::aws_error(
2459 StatusCode::BAD_REQUEST,
2460 "InvalidParameterValue",
2461 "Port must be greater than zero.",
2462 ));
2463 }
2464 if !db_instance_identifier
2465 .chars()
2466 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2467 {
2468 return Err(AwsServiceError::aws_error(
2469 StatusCode::BAD_REQUEST,
2470 "InvalidParameterValue",
2471 "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2472 ));
2473 }
2474 let supported_engines = [
2476 "postgres",
2477 "mysql",
2478 "mariadb",
2479 "oracle-ee",
2480 "oracle-se2",
2481 "oracle-ee-cdb",
2482 "oracle-se2-cdb",
2483 "sqlserver-ee",
2484 "sqlserver-se",
2485 "sqlserver-ex",
2486 "sqlserver-web",
2487 "db2-se",
2488 "db2-ae",
2489 ];
2490 if !supported_engines.contains(&engine) {
2491 return Err(AwsServiceError::aws_error(
2492 StatusCode::BAD_REQUEST,
2493 "InvalidParameterValue",
2494 format!("Engine '{}' is not supported.", engine),
2495 ));
2496 }
2497
2498 let supported_versions = match engine {
2507 "postgres" => vec!["16", "15", "14", "13", "16.3", "15.5", "14.10", "13.13"],
2508 "mysql" => vec!["8.0", "8.0.35", "8.0.28", "5.7.44"],
2509 "mariadb" => vec!["10.6", "10.11", "11.4", "11.4.5", "10.11.6", "10.6.16"],
2510 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2511 vec!["23.0.0", "21.0.0", "19.0.0"]
2512 }
2513 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2514 vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2515 }
2516 "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2517 _ => vec![],
2518 };
2519
2520 if !supported_versions.contains(&engine_version) {
2521 return Err(AwsServiceError::aws_error(
2522 StatusCode::BAD_REQUEST,
2523 "InvalidParameterValue",
2524 format!("EngineVersion '{engine_version}' is not supported yet."),
2525 ));
2526 }
2527 validate_db_instance_class(db_instance_class)?;
2528 Ok(())
2529}
2530
2531fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2532 if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2533 return Err(AwsServiceError::aws_error(
2534 StatusCode::BAD_REQUEST,
2535 "InvalidParameterValue",
2536 format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2537 ));
2538 }
2539 Ok(())
2540}
2541
2542fn filter_engine_versions(
2543 versions: &[EngineVersionInfo],
2544 engine: &Option<String>,
2545 engine_version: &Option<String>,
2546 family: &Option<String>,
2547) -> Vec<EngineVersionInfo> {
2548 versions
2549 .iter()
2550 .filter(|candidate| {
2551 engine
2552 .as_ref()
2553 .is_none_or(|expected| candidate.engine == *expected)
2554 })
2555 .filter(|candidate| {
2556 engine_version
2557 .as_ref()
2558 .is_none_or(|expected| candidate.engine_version == *expected)
2559 })
2560 .filter(|candidate| {
2561 family
2562 .as_ref()
2563 .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2564 })
2565 .cloned()
2566 .collect()
2567}
2568
2569fn filter_orderable_options(
2570 options: &[OrderableDbInstanceOption],
2571 engine: &Option<String>,
2572 engine_version: &Option<String>,
2573 db_instance_class: &Option<String>,
2574 license_model: &Option<String>,
2575 vpc: Option<bool>,
2576) -> Vec<OrderableDbInstanceOption> {
2577 options
2578 .iter()
2579 .filter(|candidate| {
2580 engine
2581 .as_ref()
2582 .is_none_or(|expected| candidate.engine == *expected)
2583 })
2584 .filter(|candidate| {
2585 engine_version
2586 .as_ref()
2587 .is_none_or(|expected| candidate.engine_version == *expected)
2588 })
2589 .filter(|candidate| {
2590 db_instance_class
2591 .as_ref()
2592 .is_none_or(|expected| candidate.db_instance_class == *expected)
2593 })
2594 .filter(|candidate| {
2595 license_model
2596 .as_ref()
2597 .is_none_or(|expected| candidate.license_model == *expected)
2598 })
2599 .filter(|_| vpc.unwrap_or(true))
2600 .cloned()
2601 .collect()
2602}
2603
2604#[allow(clippy::too_many_arguments)]
2608fn build_restored_instance(
2612 db_instance_identifier: &str,
2613 db_instance_arn: String,
2614 dbi_resource_id: String,
2615 created_at: chrono::DateTime<Utc>,
2616 vpc_security_group_ids: Vec<String>,
2617 snapshot: &DbSnapshot,
2618 running: &crate::runtime::RunningDbContainer,
2619) -> DbInstance {
2620 DbInstance {
2621 db_instance_identifier: db_instance_identifier.to_string(),
2622 db_instance_arn,
2623 db_instance_class: "db.t3.micro".to_string(),
2624 engine: snapshot.engine.clone(),
2625 engine_version: snapshot.engine_version.clone(),
2626 db_instance_status: "available".to_string(),
2627 master_username: snapshot.master_username.clone(),
2628 db_name: snapshot.db_name.clone(),
2629 endpoint_address: "127.0.0.1".to_string(),
2630 port: i32::from(running.host_port),
2631 allocated_storage: snapshot.allocated_storage,
2632 publicly_accessible: true,
2633 deletion_protection: false,
2634 created_at,
2635 dbi_resource_id,
2636 master_user_password: snapshot.master_user_password.clone(),
2637 container_id: running.container_id.clone(),
2638 host_port: running.host_port,
2639 tags: Vec::new(),
2640 read_replica_source_db_instance_identifier: None,
2641 read_replica_db_instance_identifiers: Vec::new(),
2642 vpc_security_group_ids,
2643 db_parameter_group_name: None,
2644 backup_retention_period: 1,
2645 preferred_backup_window: "03:00-04:00".to_string(),
2646 latest_restorable_time: Some(created_at),
2647 option_group_name: None,
2648 multi_az: false,
2649 pending_modified_values: None,
2650 }
2651}
2652
2653fn build_read_replica_instance(
2654 db_instance_identifier: &str,
2655 db_instance_arn: String,
2656 dbi_resource_id: String,
2657 created_at: chrono::DateTime<Utc>,
2658 source_db_instance_identifier: &str,
2659 source: &DbInstance,
2660 running: &crate::runtime::RunningDbContainer,
2661) -> DbInstance {
2662 DbInstance {
2663 db_instance_identifier: db_instance_identifier.to_string(),
2664 db_instance_arn,
2665 db_instance_class: source.db_instance_class.clone(),
2666 engine: source.engine.clone(),
2667 engine_version: source.engine_version.clone(),
2668 db_instance_status: "available".to_string(),
2669 master_username: source.master_username.clone(),
2670 db_name: source.db_name.clone(),
2671 endpoint_address: "127.0.0.1".to_string(),
2672 port: i32::from(running.host_port),
2673 allocated_storage: source.allocated_storage,
2674 publicly_accessible: source.publicly_accessible,
2675 deletion_protection: false,
2676 created_at,
2677 dbi_resource_id,
2678 master_user_password: source.master_user_password.clone(),
2679 container_id: running.container_id.clone(),
2680 host_port: running.host_port,
2681 tags: Vec::new(),
2682 read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2683 read_replica_db_instance_identifiers: Vec::new(),
2684 vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2685 db_parameter_group_name: source.db_parameter_group_name.clone(),
2686 backup_retention_period: source.backup_retention_period,
2687 preferred_backup_window: source.preferred_backup_window.clone(),
2688 latest_restorable_time: if source.backup_retention_period > 0 {
2689 Some(created_at)
2690 } else {
2691 None
2692 },
2693 option_group_name: source.option_group_name.clone(),
2694 multi_az: source.multi_az,
2695 pending_modified_values: None,
2696 }
2697}
2698
2699fn engine_version_xml(version: &EngineVersionInfo) -> String {
2700 format!(
2701 "<DBEngineVersion>\
2702 <Engine>{}</Engine>\
2703 <EngineVersion>{}</EngineVersion>\
2704 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2705 <DBEngineDescription>{}</DBEngineDescription>\
2706 <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2707 <Status>{}</Status>\
2708 </DBEngineVersion>",
2709 xml_escape(&version.engine),
2710 xml_escape(&version.engine_version),
2711 xml_escape(&version.db_parameter_group_family),
2712 xml_escape(&version.db_engine_description),
2713 xml_escape(&version.db_engine_version_description),
2714 xml_escape(&version.status),
2715 )
2716}
2717
2718fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2719 format!(
2720 "<OrderableDBInstanceOption>\
2721 <Engine>{}</Engine>\
2722 <EngineVersion>{}</EngineVersion>\
2723 <DBInstanceClass>{}</DBInstanceClass>\
2724 <LicenseModel>{}</LicenseModel>\
2725 <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2726 <MultiAZCapable>true</MultiAZCapable>\
2727 <ReadReplicaCapable>true</ReadReplicaCapable>\
2728 <Vpc>true</Vpc>\
2729 <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2730 <StorageType>{}</StorageType>\
2731 <SupportsIops>false</SupportsIops>\
2732 <MinStorageSize>{}</MinStorageSize>\
2733 <MaxStorageSize>{}</MaxStorageSize>\
2734 <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2735 </OrderableDBInstanceOption>",
2736 xml_escape(&option.engine),
2737 xml_escape(&option.engine_version),
2738 xml_escape(&option.db_instance_class),
2739 xml_escape(&option.license_model),
2740 xml_escape(&option.storage_type),
2741 option.min_storage_size,
2742 option.max_storage_size,
2743 )
2744}
2745
2746fn tag_xml(tag: &RdsTag) -> String {
2747 format!(
2748 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2749 xml_escape(&tag.key),
2750 xml_escape(&tag.value),
2751 )
2752}
2753
2754pub(crate) fn emit_event_static(
2757 delivery_bus: Option<&Arc<DeliveryBus>>,
2758 source_type: RdsSourceType,
2759 source_identifier: &str,
2760 source_arn: &str,
2761 event_id: &str,
2762 event_categories: &[&str],
2763 message: &str,
2764) {
2765 let Some(bus) = delivery_bus else {
2766 return;
2767 };
2768 let detail = serde_json::json!({
2769 "EventCategories": event_categories,
2770 "SourceType": source_type.as_str(),
2771 "SourceArn": source_arn,
2772 "Date": Utc::now().to_rfc3339(),
2773 "Message": message,
2774 "SourceIdentifier": source_identifier,
2775 "EventID": event_id,
2776 });
2777 bus.put_event_to_eventbridge(
2778 "aws.rds",
2779 source_type.detail_type(),
2780 &detail.to_string(),
2781 "default",
2782 );
2783}
2784
2785fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2786 let status = status_override.unwrap_or(&instance.db_instance_status);
2787 let db_name_xml = instance
2788 .db_name
2789 .as_ref()
2790 .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2791 .unwrap_or_default();
2792
2793 let read_replica_source_xml = instance
2794 .read_replica_source_db_instance_identifier
2795 .as_ref()
2796 .map(|source| {
2797 format!(
2798 "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2799 xml_escape(source)
2800 )
2801 })
2802 .unwrap_or_default();
2803
2804 let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2805 "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2806 } else {
2807 format!(
2808 "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2809 instance
2810 .read_replica_db_instance_identifiers
2811 .iter()
2812 .map(|id| format!(
2813 "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2814 xml_escape(id)
2815 ))
2816 .collect::<String>()
2817 )
2818 };
2819
2820 let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2821 "<VpcSecurityGroups/>".to_string()
2822 } else {
2823 format!(
2824 "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2825 instance
2826 .vpc_security_group_ids
2827 .iter()
2828 .map(|sg_id| format!(
2829 "<VpcSecurityGroupMembership>\
2830 <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2831 <Status>active</Status>\
2832 </VpcSecurityGroupMembership>",
2833 xml_escape(sg_id)
2834 ))
2835 .collect::<String>()
2836 )
2837 };
2838
2839 let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2840 Some(pg_name) => format!(
2841 "<DBParameterGroups>\
2842 <DBParameterGroup>\
2843 <DBParameterGroupName>{}</DBParameterGroupName>\
2844 <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2845 </DBParameterGroup>\
2846 </DBParameterGroups>",
2847 xml_escape(pg_name)
2848 ),
2849 None => "<DBParameterGroups/>".to_string(),
2850 };
2851
2852 let option_group_memberships_xml = match &instance.option_group_name {
2853 Some(og_name) => format!(
2854 "<OptionGroupMemberships>\
2855 <OptionGroupMembership>\
2856 <OptionGroupName>{}</OptionGroupName>\
2857 <Status>in-sync</Status>\
2858 </OptionGroupMembership>\
2859 </OptionGroupMemberships>",
2860 xml_escape(og_name)
2861 ),
2862 None => "<OptionGroupMemberships/>".to_string(),
2863 };
2864
2865 let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2866 let mut fields = Vec::new();
2867 if let Some(ref class) = pending.db_instance_class {
2868 fields.push(format!(
2869 "<DBInstanceClass>{}</DBInstanceClass>",
2870 xml_escape(class)
2871 ));
2872 }
2873 if let Some(allocated_storage) = pending.allocated_storage {
2874 fields.push(format!(
2875 "<AllocatedStorage>{}</AllocatedStorage>",
2876 allocated_storage
2877 ));
2878 }
2879 if let Some(backup_retention_period) = pending.backup_retention_period {
2880 fields.push(format!(
2881 "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2882 backup_retention_period
2883 ));
2884 }
2885 if let Some(multi_az) = pending.multi_az {
2886 fields.push(format!(
2887 "<MultiAZ>{}</MultiAZ>",
2888 if multi_az { "true" } else { "false" }
2889 ));
2890 }
2891 if let Some(ref engine_version) = pending.engine_version {
2892 fields.push(format!(
2893 "<EngineVersion>{}</EngineVersion>",
2894 xml_escape(engine_version)
2895 ));
2896 }
2897 if pending.master_user_password.is_some() {
2898 fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2899 }
2900 if !fields.is_empty() {
2901 format!(
2902 "<PendingModifiedValues>{}</PendingModifiedValues>",
2903 fields.join("")
2904 )
2905 } else {
2906 String::new()
2907 }
2908 } else {
2909 String::new()
2910 };
2911
2912 let latest_restorable_time_xml = instance
2913 .latest_restorable_time
2914 .map(|t| {
2915 format!(
2916 "<LatestRestorableTime>{}</LatestRestorableTime>",
2917 t.to_rfc3339()
2918 )
2919 })
2920 .unwrap_or_default();
2921
2922 let endpoint_xml = if instance.endpoint_address.is_empty() || instance.port == 0 {
2927 String::new()
2928 } else {
2929 format!(
2930 "<Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>",
2931 xml_escape(&instance.endpoint_address),
2932 instance.port
2933 )
2934 };
2935
2936 format!(
2937 "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2938 <DBInstanceClass>{class}</DBInstanceClass>\
2939 <Engine>{engine}</Engine>\
2940 <DBInstanceStatus>{status}</DBInstanceStatus>\
2941 <MasterUsername>{master_username}</MasterUsername>\
2942 {db_name_xml}\
2943 {endpoint_xml}\
2944 <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2945 <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2946 <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2947 <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2948 <DBSecurityGroups/>\
2949 {vpc_security_groups_xml}\
2950 {db_parameter_groups_xml}\
2951 <AvailabilityZone>us-east-1a</AvailabilityZone>\
2952 {latest_restorable_time_xml}\
2953 <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2954 <MultiAZ>{multi_az}</MultiAZ>\
2955 <EngineVersion>{engine_version}</EngineVersion>\
2956 <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2957 {read_replica_identifiers_xml}\
2958 {read_replica_source_xml}\
2959 <LicenseModel>{license_model}</LicenseModel>\
2960 {option_group_memberships_xml}\
2961 <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2962 <StorageType>gp2</StorageType>\
2963 <DbInstancePort>{port}</DbInstancePort>\
2964 <StorageEncrypted>false</StorageEncrypted>\
2965 <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2966 <DeletionProtection>{deletion_protection}</DeletionProtection>\
2967 {pending_modified_values_xml}\
2968 <DBInstanceArn>{arn}</DBInstanceArn>",
2969 identifier = xml_escape(&instance.db_instance_identifier),
2970 class = xml_escape(&instance.db_instance_class),
2971 engine = xml_escape(&instance.engine),
2972 status = xml_escape(status),
2973 master_username = xml_escape(&instance.master_username),
2974 port = instance.port,
2975 allocated_storage = instance.allocated_storage,
2976 create_time = instance.created_at.to_rfc3339(),
2977 preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2978 backup_retention_period = instance.backup_retention_period,
2979 multi_az = if instance.multi_az { "true" } else { "false" },
2980 engine_version = xml_escape(&instance.engine_version),
2981 license_model = license_model_for_engine(&instance.engine),
2982 publicly_accessible = if instance.publicly_accessible {
2983 "true"
2984 } else {
2985 "false"
2986 },
2987 dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2988 deletion_protection = if instance.deletion_protection {
2989 "true"
2990 } else {
2991 "false"
2992 },
2993 arn = xml_escape(&instance.db_instance_arn),
2994 )
2995}
2996
2997fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2998 format!(
2999 "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
3000 <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
3001 <SnapshotCreateTime>{}</SnapshotCreateTime>\
3002 <Engine>{}</Engine>\
3003 <EngineVersion>{}</EngineVersion>\
3004 <AllocatedStorage>{}</AllocatedStorage>\
3005 <Status>{}</Status>\
3006 <Port>{}</Port>\
3007 <MasterUsername>{}</MasterUsername>\
3008 {}\
3009 <DbiResourceId>{}</DbiResourceId>\
3010 <SnapshotType>{}</SnapshotType>\
3011 <DBSnapshotArn>{}</DBSnapshotArn>",
3012 xml_escape(&snapshot.db_snapshot_identifier),
3013 xml_escape(&snapshot.db_instance_identifier),
3014 snapshot.snapshot_create_time.to_rfc3339(),
3015 xml_escape(&snapshot.engine),
3016 xml_escape(&snapshot.engine_version),
3017 snapshot.allocated_storage,
3018 xml_escape(&snapshot.status),
3019 snapshot.port,
3020 xml_escape(&snapshot.master_username),
3021 snapshot
3022 .db_name
3023 .as_ref()
3024 .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
3025 .unwrap_or_default(),
3026 xml_escape(&snapshot.dbi_resource_id),
3027 xml_escape(&snapshot.snapshot_type),
3028 xml_escape(&snapshot.db_snapshot_arn),
3029 )
3030}
3031
3032fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
3033 let subnets_xml = subnet_group
3034 .subnet_ids
3035 .iter()
3036 .zip(&subnet_group.subnet_availability_zones)
3037 .map(|(subnet_id, az)| {
3038 format!(
3039 "<Subnet>\
3040 <SubnetIdentifier>{}</SubnetIdentifier>\
3041 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3042 <SubnetStatus>Active</SubnetStatus>\
3043 </Subnet>",
3044 xml_escape(subnet_id),
3045 xml_escape(az)
3046 )
3047 })
3048 .collect::<String>();
3049
3050 format!(
3051 "<DBSubnetGroupName>{}</DBSubnetGroupName>\
3052 <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
3053 <VpcId>{}</VpcId>\
3054 <SubnetGroupStatus>Complete</SubnetGroupStatus>\
3055 <Subnets>{}</Subnets>\
3056 <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
3057 xml_escape(&subnet_group.db_subnet_group_name),
3058 xml_escape(&subnet_group.db_subnet_group_description),
3059 xml_escape(&subnet_group.vpc_id),
3060 subnets_xml,
3061 xml_escape(&subnet_group.db_subnet_group_arn),
3062 )
3063}
3064
3065fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
3066 format!(
3067 "<DBParameterGroupName>{}</DBParameterGroupName>\
3068 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
3069 <Description>{}</Description>\
3070 <DBParameterGroupArn>{}</DBParameterGroupArn>",
3071 xml_escape(¶meter_group.db_parameter_group_name),
3072 xml_escape(¶meter_group.db_parameter_group_family),
3073 xml_escape(¶meter_group.description),
3074 xml_escape(¶meter_group.db_parameter_group_arn),
3075 )
3076}
3077
3078fn db_instance_not_found(identifier: &str) -> AwsServiceError {
3079 AwsServiceError::aws_error(
3080 StatusCode::NOT_FOUND,
3081 "DBInstanceNotFound",
3082 format!("DBInstance {} not found.", identifier),
3083 )
3084}
3085
3086fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
3087 AwsServiceError::aws_error(
3088 StatusCode::NOT_FOUND,
3089 "DBSnapshotNotFound",
3090 format!("DBSnapshot {} not found.", identifier),
3091 )
3092}
3093
3094fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
3095 AwsServiceError::aws_error(
3096 StatusCode::NOT_FOUND,
3097 "DBInstanceNotFound",
3098 format!("DBInstance {resource_name} not found."),
3099 )
3100}
3101
3102fn find_instance_by_arn<'a>(
3103 state: &'a crate::state::RdsState,
3104 resource_name: &str,
3105) -> Result<&'a DbInstance, AwsServiceError> {
3106 state
3107 .instances
3108 .values()
3109 .find(|instance| instance.db_instance_arn == resource_name)
3110 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3111}
3112
3113fn find_instance_by_arn_mut<'a>(
3114 state: &'a mut crate::state::RdsState,
3115 resource_name: &str,
3116) -> Result<&'a mut DbInstance, AwsServiceError> {
3117 state
3118 .instances
3119 .values_mut()
3120 .find(|instance| instance.db_instance_arn == resource_name)
3121 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3122}
3123
3124fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3125 for tag in incoming {
3126 if let Some(existing_tag) = existing
3127 .iter_mut()
3128 .find(|candidate| candidate.key == tag.key)
3129 {
3130 existing_tag.value = tag.value.clone();
3131 } else {
3132 existing.push(tag.clone());
3133 }
3134 }
3135}
3136
3137fn license_model_for_engine(engine: &str) -> &'static str {
3138 match engine {
3144 "mysql" | "mariadb" => "general-public-license",
3145 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3146 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3147 "db2-se" | "db2-ae" => "bring-your-own-license",
3148 _ => "postgresql-license",
3149 }
3150}
3151
3152fn default_db_name(engine: &str) -> &'static str {
3153 match engine {
3154 "mysql" | "mariadb" => "mysql",
3155 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3160 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3164 "db2-se" | "db2-ae" => "BLUDB",
3165 _ => "postgres",
3166 }
3167}
3168
3169fn default_port_for_engine(engine: &str) -> i32 {
3173 match engine {
3174 "postgres" => 5432,
3175 "mysql" | "mariadb" => 3306,
3176 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3177 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3178 "db2-se" | "db2-ae" => 50000,
3179 _ => 5432,
3180 }
3181}
3182
3183fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3189 match engine {
3190 "postgres" => {
3191 let major = engine_version.split('.').next().unwrap_or("16");
3192 format!("default.postgres{}", major)
3193 }
3194 "mysql" => {
3195 let major = if engine_version.starts_with("5.7") {
3196 "5.7"
3197 } else {
3198 "8.0"
3199 };
3200 format!("default.mysql{}", major)
3201 }
3202 "mariadb" => {
3203 let major = if engine_version.starts_with("11.4") {
3204 "11.4"
3205 } else if engine_version.starts_with("10.11") {
3206 "10.11"
3207 } else {
3208 "10.6"
3209 };
3210 format!("default.mariadb{}", major)
3211 }
3212 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3213 let major = engine_version.split('.').next().unwrap_or("23");
3214 format!("default.{engine}-{major}")
3215 }
3216 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3217 let major = engine_version.split('.').next().unwrap_or("16");
3220 format!("default.{engine}-{major}")
3221 }
3222 "db2-se" | "db2-ae" => {
3223 let mut parts = engine_version.split('.');
3226 let major = parts.next().unwrap_or("11");
3227 let minor = parts.next().unwrap_or("5");
3228 format!("default.{engine}-{major}.{minor}")
3229 }
3230 _ => "default.postgres16".to_string(),
3231 }
3232}
3233
3234fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3235 match error {
3236 RuntimeError::Unavailable => AwsServiceError::aws_error(
3237 StatusCode::SERVICE_UNAVAILABLE,
3238 "InvalidParameterValue",
3239 "Docker/Podman is required for RDS DB instances but is not available",
3240 ),
3241 RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3242 StatusCode::INTERNAL_SERVER_ERROR,
3243 "InternalFailure",
3244 message,
3245 ),
3246 }
3247}
3248
3249#[cfg(test)]
3250mod tests {
3251 use std::collections::HashMap;
3252 use std::sync::Arc;
3253
3254 use bytes::Bytes;
3255 use chrono::Utc;
3256 use http::{HeaderMap, Method};
3257 use parking_lot::RwLock;
3258 use uuid::Uuid;
3259
3260 use super::{
3261 db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3262 filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3263 optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3264 RdsSourceType,
3265 };
3266 use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3267 use fakecloud_core::delivery::DeliveryBus;
3268 use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3269
3270 #[test]
3271 fn default_port_matches_aws_for_each_engine() {
3272 assert_eq!(default_port_for_engine("postgres"), 5432);
3273 assert_eq!(default_port_for_engine("mysql"), 3306);
3274 assert_eq!(default_port_for_engine("mariadb"), 3306);
3275 assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3276 assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3277 assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3278 assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3279 assert_eq!(default_port_for_engine("db2-se"), 50000);
3280 assert_eq!(default_port_for_engine("db2-ae"), 50000);
3281 }
3282
3283 #[test]
3284 fn default_parameter_group_uses_engine_major_version() {
3285 assert_eq!(
3286 default_parameter_group("postgres", "16.3"),
3287 "default.postgres16"
3288 );
3289 assert_eq!(
3290 default_parameter_group("mysql", "8.0.35"),
3291 "default.mysql8.0"
3292 );
3293 assert_eq!(
3294 default_parameter_group("oracle-ee", "23.0.0"),
3295 "default.oracle-ee-23"
3296 );
3297 assert_eq!(
3298 default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3299 "default.sqlserver-ex-16"
3300 );
3301 assert_eq!(
3302 default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3303 "default.db2-se-11.5"
3304 );
3305 }
3306
3307 #[test]
3308 fn license_model_reflects_engine_class() {
3309 assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3310 assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3311 assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3312 assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3313 assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3314 }
3315
3316 #[test]
3317 fn default_db_name_picks_per_engine_default() {
3318 assert_eq!(default_db_name("postgres"), "postgres");
3319 assert_eq!(default_db_name("mysql"), "mysql");
3320 assert_eq!(default_db_name("oracle-ee"), "ORCL");
3321 assert_eq!(default_db_name("sqlserver-ex"), "master");
3322 assert_eq!(default_db_name("db2-se"), "BLUDB");
3323 }
3324
3325 #[test]
3326 fn validate_create_request_accepts_new_engines() {
3327 for (engine, version, port) in [
3328 ("oracle-ee", "23.0.0", 1521),
3329 ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3330 ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3331 ] {
3332 validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3333 .expect("engine should be accepted");
3334 }
3335 }
3336
3337 #[test]
3338 fn validate_create_request_rejects_unsupported_engine_version() {
3339 let err =
3340 validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3341 .expect_err("12.x is not in the supported list");
3342 let msg = format!("{err:?}");
3343 assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3344 }
3345
3346 #[test]
3347 fn filter_engine_versions_matches_requested_engine() {
3348 let versions = default_engine_versions();
3349
3350 let filtered =
3351 filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3352
3353 assert_eq!(filtered.len(), 4); assert!(filtered.iter().all(|v| v.engine == "postgres"));
3355 }
3356
3357 #[test]
3358 fn filter_orderable_options_respects_instance_class() {
3359 let options = default_orderable_options();
3360
3361 let filtered = filter_orderable_options(
3362 &options,
3363 &Some("postgres".to_string()),
3364 &Some("16.3".to_string()),
3365 &Some("db.t3.micro".to_string()),
3366 &None,
3367 Some(true),
3368 );
3369
3370 assert_eq!(filtered.len(), 1);
3371 assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3372 }
3373
3374 #[test]
3375 fn validate_create_request_rejects_unsupported_engine() {
3376 let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3377 .expect_err("unsupported engine");
3378
3379 assert_eq!(error.code(), "InvalidParameterValue");
3380 }
3381
3382 #[test]
3383 fn optional_i32_param_rejects_invalid_integer() {
3384 let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3385
3386 let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3387
3388 assert_eq!(error.code(), "InvalidParameterValue");
3389 }
3390
3391 #[test]
3392 fn db_instance_xml_renders_endpoint_and_status() {
3393 let created_at = Utc::now();
3394 let instance = DbInstance {
3395 db_instance_identifier: "test-db".to_string(),
3396 db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3397 db_instance_class: "db.t3.micro".to_string(),
3398 engine: "postgres".to_string(),
3399 engine_version: "16.3".to_string(),
3400 db_instance_status: "available".to_string(),
3401 master_username: "admin".to_string(),
3402 db_name: Some("appdb".to_string()),
3403 endpoint_address: "127.0.0.1".to_string(),
3404 port: 15432,
3405 allocated_storage: 20,
3406 publicly_accessible: true,
3407 deletion_protection: false,
3408 created_at,
3409 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3410 master_user_password: "secret123".to_string(),
3411 container_id: "container".to_string(),
3412 host_port: 15432,
3413 tags: Vec::new(),
3414 read_replica_source_db_instance_identifier: None,
3415 read_replica_db_instance_identifiers: Vec::new(),
3416 vpc_security_group_ids: vec!["sg-12345678".to_string()],
3417 db_parameter_group_name: Some("default.postgres16".to_string()),
3418 backup_retention_period: 1,
3419 preferred_backup_window: "03:00-04:00".to_string(),
3420 latest_restorable_time: Some(created_at),
3421 option_group_name: None,
3422 multi_az: false,
3423 pending_modified_values: None,
3424 };
3425
3426 let xml = db_instance_xml(&instance, Some("creating"));
3427
3428 assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3429 assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3430 assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3431 }
3432
3433 #[test]
3434 fn parse_tags_reads_rds_query_shape() {
3435 let request = request(
3436 "AddTagsToResource",
3437 &[
3438 ("Tags.Tag.1.Key", "env"),
3439 ("Tags.Tag.1.Value", "dev"),
3440 ("Tags.Tag.2.Key", "team"),
3441 ("Tags.Tag.2.Value", "core"),
3442 ],
3443 );
3444
3445 let tags = parse_tags(&request).expect("tags");
3446
3447 assert_eq!(
3448 tags,
3449 vec![
3450 RdsTag {
3451 key: "env".to_string(),
3452 value: "dev".to_string(),
3453 },
3454 RdsTag {
3455 key: "team".to_string(),
3456 value: "core".to_string(),
3457 }
3458 ]
3459 );
3460 }
3461
3462 #[test]
3463 fn parse_tag_keys_reads_member_shape() {
3464 let request = request(
3465 "RemoveTagsFromResource",
3466 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3467 );
3468
3469 let tag_keys = parse_tag_keys(&request).expect("tag keys");
3470
3471 assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3472 }
3473
3474 #[test]
3475 fn merge_tags_updates_existing_values() {
3476 let mut tags = vec![RdsTag {
3477 key: "env".to_string(),
3478 value: "dev".to_string(),
3479 }];
3480
3481 merge_tags(
3482 &mut tags,
3483 &[
3484 RdsTag {
3485 key: "env".to_string(),
3486 value: "prod".to_string(),
3487 },
3488 RdsTag {
3489 key: "team".to_string(),
3490 value: "core".to_string(),
3491 },
3492 ],
3493 );
3494
3495 assert_eq!(tags.len(), 2);
3496 assert_eq!(tags[0].value, "prod");
3497 assert_eq!(tags[1].key, "team");
3498 }
3499
3500 #[tokio::test]
3501 async fn describe_engine_versions_returns_xml_body() {
3502 let service = RdsService::new(Arc::new(RwLock::new(
3503 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3504 )));
3505 let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3506
3507 let response = service.handle(request).await.expect("response");
3508 let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3509
3510 assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3511 assert!(body.contains("<Engine>postgres</Engine>"));
3512 assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3513 }
3514
3515 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3516 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3517 for (key, value) in params {
3518 query_params.insert((*key).to_string(), (*value).to_string());
3519 }
3520
3521 AwsRequest {
3522 service: "rds".to_string(),
3523 action: action.to_string(),
3524 region: "us-east-1".to_string(),
3525 account_id: "123456789012".to_string(),
3526 request_id: "test-request-id".to_string(),
3527 headers: HeaderMap::new(),
3528 query_params,
3529 body: Bytes::new(),
3530 body_stream: parking_lot::Mutex::new(None),
3531 path_segments: vec![],
3532 raw_path: "/".to_string(),
3533 raw_query: String::new(),
3534 method: Method::POST,
3535 is_query_protocol: true,
3536 access_key_id: None,
3537 principal: None,
3538 }
3539 }
3540
3541 fn make_service() -> RdsService {
3544 RdsService::new(Arc::new(RwLock::new(
3545 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3546 )))
3547 }
3548
3549 #[derive(Default)]
3550 struct CapturedEvent {
3551 source: String,
3552 detail_type: String,
3553 detail: String,
3554 }
3555
3556 #[derive(Default)]
3557 struct RecordingEb {
3558 events: std::sync::Mutex<Vec<CapturedEvent>>,
3559 }
3560
3561 impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3562 fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3563 self.events.lock().unwrap().push(CapturedEvent {
3564 source: source.to_string(),
3565 detail_type: detail_type.to_string(),
3566 detail: detail.to_string(),
3567 });
3568 }
3569 }
3570
3571 fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3572 let recorder = Arc::new(RecordingEb::default());
3573 let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3574 let svc = RdsService::new(Arc::new(RwLock::new(
3575 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3576 )))
3577 .with_delivery_bus(bus);
3578 (svc, recorder)
3579 }
3580
3581 #[test]
3582 fn emit_event_emits_aws_rds_event_via_bus() {
3583 let (svc, rec) = make_service_with_recorder();
3584 svc.emit_event(
3585 RdsSourceType::DbInstance,
3586 "my-db",
3587 "arn:aws:rds:us-east-1:123456789012:db:my-db",
3588 "RDS-EVENT-0005",
3589 &["creation"],
3590 "DB instance created",
3591 );
3592 let events = rec.events.lock().unwrap();
3593 assert_eq!(events.len(), 1);
3594 let e = &events[0];
3595 assert_eq!(e.source, "aws.rds");
3596 assert_eq!(e.detail_type, "RDS DB Instance Event");
3597 let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3598 assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3599 assert_eq!(detail["SourceType"], "DB_INSTANCE");
3600 assert_eq!(detail["SourceIdentifier"], "my-db");
3601 assert_eq!(detail["Message"], "DB instance created");
3602 assert_eq!(detail["EventCategories"][0], "creation");
3603 }
3604
3605 #[test]
3606 fn emit_event_no_op_without_bus() {
3607 let svc = make_service();
3608 svc.emit_event(
3609 RdsSourceType::DbSnapshot,
3610 "snap",
3611 "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3612 "RDS-EVENT-0042",
3613 &["creation"],
3614 "Manual snapshot created",
3615 );
3616 }
3617
3618 #[test]
3619 fn rds_source_type_detail_type_mapping() {
3620 assert_eq!(
3621 RdsSourceType::DbInstance.detail_type(),
3622 "RDS DB Instance Event"
3623 );
3624 assert_eq!(
3625 RdsSourceType::DbSnapshot.detail_type(),
3626 "RDS DB Snapshot Event"
3627 );
3628 assert_eq!(
3629 RdsSourceType::DbParameterGroup.detail_type(),
3630 "RDS DB Parameter Group Event"
3631 );
3632 }
3633
3634 fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3635 String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3636 }
3637
3638 fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3639 let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3640 let mut accounts = svc.state.write();
3641 let state = accounts.default_mut();
3642 state.instances.insert(
3643 identifier.to_string(),
3644 DbInstance {
3645 db_instance_identifier: identifier.to_string(),
3646 db_instance_arn: arn.clone(),
3647 db_instance_class: "db.t3.micro".to_string(),
3648 engine: "postgres".to_string(),
3649 engine_version: "16.3".to_string(),
3650 db_instance_status: "available".to_string(),
3651 master_username: "admin".to_string(),
3652 db_name: Some("appdb".to_string()),
3653 endpoint_address: "127.0.0.1".to_string(),
3654 port: 15432,
3655 allocated_storage: 20,
3656 publicly_accessible: true,
3657 deletion_protection: false,
3658 created_at: Utc::now(),
3659 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3660 master_user_password: "secret".to_string(),
3661 container_id: "container".to_string(),
3662 host_port: 15432,
3663 tags: Vec::new(),
3664 read_replica_source_db_instance_identifier: None,
3665 read_replica_db_instance_identifiers: Vec::new(),
3666 vpc_security_group_ids: vec!["sg-12345678".to_string()],
3667 db_parameter_group_name: Some("default.postgres16".to_string()),
3668 backup_retention_period: 1,
3669 preferred_backup_window: "03:00-04:00".to_string(),
3670 latest_restorable_time: None,
3671 option_group_name: None,
3672 multi_az: false,
3673 pending_modified_values: None,
3674 },
3675 );
3676 arn
3677 }
3678
3679 fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3680 match result {
3681 Ok(_) => panic!("expected error {expected_code}, got Ok"),
3682 Err(e) => {
3683 assert_eq!(e.code(), expected_code, "wrong error code");
3684 e
3685 }
3686 }
3687 }
3688
3689 #[test]
3692 fn add_tags_requires_resource_name() {
3693 let svc = make_service();
3694 let req = request("AddTagsToResource", &[]);
3695 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3696 }
3697
3698 #[test]
3699 fn add_tags_requires_at_least_one_tag() {
3700 let svc = make_service();
3701 let arn = seed_instance(&svc, "db1");
3702 let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3703 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3704 }
3705
3706 #[test]
3707 fn add_tags_appends_then_list_tags_returns_them() {
3708 let svc = make_service();
3709 let arn = seed_instance(&svc, "db1");
3710 let add_req = request(
3711 "AddTagsToResource",
3712 &[
3713 ("ResourceName", arn.as_str()),
3714 ("Tags.Tag.1.Key", "env"),
3715 ("Tags.Tag.1.Value", "dev"),
3716 ],
3717 );
3718 svc.add_tags_to_resource(&add_req).unwrap();
3719
3720 let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3721 let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3722 assert!(body.contains("<Key>env</Key>"));
3723 assert!(body.contains("<Value>dev</Value>"));
3724 }
3725
3726 #[test]
3727 fn list_tags_rejects_filters_param() {
3728 let svc = make_service();
3729 let arn = seed_instance(&svc, "db1");
3730 let req = request(
3731 "ListTagsForResource",
3732 &[
3733 ("ResourceName", arn.as_str()),
3734 ("Filters.Filter.1.Name", "x"),
3735 ],
3736 );
3737 assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3738 }
3739
3740 #[test]
3741 fn list_tags_unknown_arn_errors() {
3742 let svc = make_service();
3743 let req = request(
3744 "ListTagsForResource",
3745 &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3746 );
3747 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3748 }
3749
3750 #[test]
3751 fn remove_tags_strips_only_listed_keys() {
3752 let svc = make_service();
3753 let arn = seed_instance(&svc, "db1");
3754 {
3755 let mut __a = svc.state.write();
3756 let state = __a.default_mut();
3757 let inst = state.instances.get_mut("db1").unwrap();
3758 inst.tags = vec![
3759 RdsTag {
3760 key: "env".to_string(),
3761 value: "dev".to_string(),
3762 },
3763 RdsTag {
3764 key: "team".to_string(),
3765 value: "core".to_string(),
3766 },
3767 ];
3768 }
3769 let req = request(
3770 "RemoveTagsFromResource",
3771 &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3772 );
3773 svc.remove_tags_from_resource(&req).unwrap();
3774
3775 let __a = svc.state.read();
3776 let state = __a.default_ref();
3777 let tags = &state.instances.get("db1").unwrap().tags;
3778 assert_eq!(tags.len(), 1);
3779 assert_eq!(tags[0].key, "team");
3780 }
3781
3782 #[test]
3783 fn remove_tags_requires_keys() {
3784 let svc = make_service();
3785 let arn = seed_instance(&svc, "db1");
3786 let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3787 assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3788 }
3789
3790 fn create_subnet_group(svc: &RdsService, name: &str) {
3793 let req = request(
3794 "CreateDBSubnetGroup",
3795 &[
3796 ("DBSubnetGroupName", name),
3797 ("DBSubnetGroupDescription", "test"),
3798 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3799 ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3800 ],
3801 );
3802 svc.create_db_subnet_group(&req).unwrap();
3803 }
3804
3805 #[test]
3806 fn create_db_subnet_group_requires_two_subnets() {
3807 let svc = make_service();
3808 let req = request(
3809 "CreateDBSubnetGroup",
3810 &[
3811 ("DBSubnetGroupName", "sg1"),
3812 ("DBSubnetGroupDescription", "t"),
3813 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3814 ],
3815 );
3816 assert_code(
3817 svc.create_db_subnet_group(&req),
3818 "DBSubnetGroupDoesNotCoverEnoughAZs",
3819 );
3820 }
3821
3822 #[test]
3823 fn create_db_subnet_group_rejects_empty_subnets() {
3824 let svc = make_service();
3825 let req = request(
3826 "CreateDBSubnetGroup",
3827 &[
3828 ("DBSubnetGroupName", "sg1"),
3829 ("DBSubnetGroupDescription", "t"),
3830 ],
3831 );
3832 assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3833 }
3834
3835 #[test]
3836 fn create_db_subnet_group_rejects_duplicates() {
3837 let svc = make_service();
3838 create_subnet_group(&svc, "sg1");
3839 let req = request(
3840 "CreateDBSubnetGroup",
3841 &[
3842 ("DBSubnetGroupName", "sg1"),
3843 ("DBSubnetGroupDescription", "t"),
3844 ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3845 ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3846 ],
3847 );
3848 assert_code(
3849 svc.create_db_subnet_group(&req),
3850 "DBSubnetGroupAlreadyExists",
3851 );
3852 }
3853
3854 #[test]
3855 fn describe_db_subnet_groups_by_name_or_list() {
3856 let svc = make_service();
3857 create_subnet_group(&svc, "sg-alpha");
3858 create_subnet_group(&svc, "sg-beta");
3859
3860 let by_name = request(
3861 "DescribeDBSubnetGroups",
3862 &[("DBSubnetGroupName", "sg-alpha")],
3863 );
3864 let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3865 assert!(body.contains("sg-alpha"));
3866 assert!(!body.contains("sg-beta"));
3867
3868 let list_all = request("DescribeDBSubnetGroups", &[]);
3869 let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3870 assert!(body.contains("sg-alpha"));
3871 assert!(body.contains("sg-beta"));
3872 }
3873
3874 #[test]
3875 fn describe_db_subnet_groups_unknown_name_errors() {
3876 let svc = make_service();
3877 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3878 assert_code(
3879 svc.describe_db_subnet_groups(&req),
3880 "DBSubnetGroupNotFoundFault",
3881 );
3882 }
3883
3884 #[test]
3885 fn delete_db_subnet_group_unknown_errors() {
3886 let svc = make_service();
3887 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3888 assert_code(
3889 svc.delete_db_subnet_group(&req),
3890 "DBSubnetGroupNotFoundFault",
3891 );
3892 }
3893
3894 #[test]
3895 fn delete_db_subnet_group_removes_entry() {
3896 let svc = make_service();
3897 create_subnet_group(&svc, "sg1");
3898 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3899 svc.delete_db_subnet_group(&req).unwrap();
3900 assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3901 }
3902
3903 #[test]
3904 fn modify_db_subnet_group_updates_subnet_ids() {
3905 let svc = make_service();
3906 create_subnet_group(&svc, "sg1");
3907 let req = request(
3908 "ModifyDBSubnetGroup",
3909 &[
3910 ("DBSubnetGroupName", "sg1"),
3911 ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3912 ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3913 ],
3914 );
3915 svc.modify_db_subnet_group(&req).unwrap();
3916
3917 let __a = svc.state.read();
3918 let state = __a.default_ref();
3919 let sg = state.subnet_groups.get("sg1").unwrap();
3920 assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3921 }
3922
3923 fn create_param_group(svc: &RdsService, name: &str) {
3926 let req = request(
3927 "CreateDBParameterGroup",
3928 &[
3929 ("DBParameterGroupName", name),
3930 ("DBParameterGroupFamily", "postgres16"),
3931 ("Description", "test"),
3932 ],
3933 );
3934 svc.create_db_parameter_group(&req).unwrap();
3935 }
3936
3937 #[test]
3938 fn create_db_parameter_group_rejects_unknown_family() {
3939 let svc = make_service();
3940 let req = request(
3941 "CreateDBParameterGroup",
3942 &[
3943 ("DBParameterGroupName", "pg1"),
3944 ("DBParameterGroupFamily", "oracle19"),
3945 ("Description", "t"),
3946 ],
3947 );
3948 assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3949 }
3950
3951 #[test]
3952 fn create_db_parameter_group_rejects_duplicates() {
3953 let svc = make_service();
3954 create_param_group(&svc, "pg1");
3955 let req = request(
3956 "CreateDBParameterGroup",
3957 &[
3958 ("DBParameterGroupName", "pg1"),
3959 ("DBParameterGroupFamily", "postgres16"),
3960 ("Description", "t"),
3961 ],
3962 );
3963 assert_code(
3964 svc.create_db_parameter_group(&req),
3965 "DBParameterGroupAlreadyExists",
3966 );
3967 }
3968
3969 #[test]
3970 fn describe_db_parameter_groups_by_name_or_list() {
3971 let svc = make_service();
3972 create_param_group(&svc, "pg-alpha");
3973 create_param_group(&svc, "pg-beta");
3974 let by_name = request(
3975 "DescribeDBParameterGroups",
3976 &[("DBParameterGroupName", "pg-alpha")],
3977 );
3978 let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3979 assert!(body.contains("pg-alpha"));
3980 assert!(!body.contains("pg-beta"));
3981 let list = request("DescribeDBParameterGroups", &[]);
3982 let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3983 assert!(body.contains("pg-alpha"));
3984 assert!(body.contains("pg-beta"));
3985 }
3986
3987 #[test]
3988 fn describe_db_parameter_groups_unknown_name_errors() {
3989 let svc = make_service();
3990 let req = request(
3991 "DescribeDBParameterGroups",
3992 &[("DBParameterGroupName", "ghost")],
3993 );
3994 assert_code(
3995 svc.describe_db_parameter_groups(&req),
3996 "DBParameterGroupNotFound",
3997 );
3998 }
3999
4000 #[test]
4001 fn delete_db_parameter_group_rejects_default_groups() {
4002 let svc = make_service();
4003 let req = request(
4004 "DeleteDBParameterGroup",
4005 &[("DBParameterGroupName", "default.postgres16")],
4006 );
4007 assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
4008 }
4009
4010 #[test]
4011 fn delete_db_parameter_group_unknown_errors() {
4012 let svc = make_service();
4013 let req = request(
4014 "DeleteDBParameterGroup",
4015 &[("DBParameterGroupName", "ghost")],
4016 );
4017 assert_code(
4018 svc.delete_db_parameter_group(&req),
4019 "DBParameterGroupNotFound",
4020 );
4021 }
4022
4023 #[test]
4024 fn delete_db_parameter_group_removes_entry() {
4025 let svc = make_service();
4026 create_param_group(&svc, "pg1");
4027 let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
4028 svc.delete_db_parameter_group(&req).unwrap();
4029 assert!(!svc
4030 .state
4031 .read()
4032 .default_ref()
4033 .parameter_groups
4034 .contains_key("pg1"));
4035 }
4036
4037 #[test]
4038 fn modify_db_parameter_group_updates_description() {
4039 let svc = make_service();
4040 create_param_group(&svc, "pg1");
4041 let req = request(
4042 "ModifyDBParameterGroup",
4043 &[
4044 ("DBParameterGroupName", "pg1"),
4045 ("Description", "shiny new"),
4046 ],
4047 );
4048 svc.modify_db_parameter_group(&req).unwrap();
4049 let __a = svc.state.read();
4050 let state = __a.default_ref();
4051 assert_eq!(
4052 state.parameter_groups.get("pg1").unwrap().description,
4053 "shiny new"
4054 );
4055 }
4056
4057 #[test]
4058 fn modify_db_parameter_group_unknown_errors() {
4059 let svc = make_service();
4060 let req = request(
4061 "ModifyDBParameterGroup",
4062 &[("DBParameterGroupName", "ghost"), ("Description", "x")],
4063 );
4064 assert_code(
4065 svc.modify_db_parameter_group(&req),
4066 "DBParameterGroupNotFound",
4067 );
4068 }
4069
4070 #[test]
4073 fn describe_db_instances_by_id_returns_only_one() {
4074 let svc = make_service();
4075 seed_instance(&svc, "db1");
4076 seed_instance(&svc, "db2");
4077 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
4078 let body = body_of(svc.describe_db_instances(&req).unwrap());
4079 assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
4080 assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
4081 }
4082
4083 #[test]
4084 fn describe_db_instances_unknown_id_errors() {
4085 let svc = make_service();
4086 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4087 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4088 }
4089
4090 #[test]
4091 fn describe_db_instances_lists_all_when_unbounded() {
4092 let svc = make_service();
4093 seed_instance(&svc, "db1");
4094 seed_instance(&svc, "db2");
4095 seed_instance(&svc, "db3");
4096 let req = request("DescribeDBInstances", &[]);
4097 let body = body_of(svc.describe_db_instances(&req).unwrap());
4098 for id in ["db1", "db2", "db3"] {
4099 assert!(body.contains(&format!(
4100 "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
4101 )));
4102 }
4103 }
4104
4105 #[test]
4108 fn modify_db_instance_requires_at_least_one_change() {
4109 let svc = make_service();
4110 seed_instance(&svc, "db1");
4111 let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4112 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4113 }
4114
4115 #[test]
4116 fn modify_db_instance_unknown_errors() {
4117 let svc = make_service();
4118 let req = request(
4119 "ModifyDBInstance",
4120 &[
4121 ("DBInstanceIdentifier", "ghost"),
4122 ("DBInstanceClass", "db.t3.small"),
4123 ],
4124 );
4125 assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4126 }
4127
4128 #[test]
4129 fn modify_db_instance_apply_immediately_updates_class() {
4130 let svc = make_service();
4131 seed_instance(&svc, "db1");
4132 let req = request(
4133 "ModifyDBInstance",
4134 &[
4135 ("DBInstanceIdentifier", "db1"),
4136 ("DBInstanceClass", "db.t3.small"),
4137 ("ApplyImmediately", "true"),
4138 ],
4139 );
4140 svc.modify_db_instance(&req).unwrap();
4141 let __a = svc.state.read();
4142 let state = __a.default_ref();
4143 assert_eq!(
4144 state.instances.get("db1").unwrap().db_instance_class,
4145 "db.t3.small"
4146 );
4147 }
4148
4149 #[test]
4150 fn modify_db_instance_pending_when_not_apply_immediately() {
4151 let svc = make_service();
4152 seed_instance(&svc, "db1");
4153 let req = request(
4154 "ModifyDBInstance",
4155 &[
4156 ("DBInstanceIdentifier", "db1"),
4157 ("DBInstanceClass", "db.t3.small"),
4158 ("ApplyImmediately", "false"),
4159 ],
4160 );
4161 svc.modify_db_instance(&req).unwrap();
4162 let __a = svc.state.read();
4163 let state = __a.default_ref();
4164 let inst = state.instances.get("db1").unwrap();
4165 assert_eq!(inst.db_instance_class, "db.t3.micro");
4166 assert_eq!(
4167 inst.pending_modified_values
4168 .as_ref()
4169 .unwrap()
4170 .db_instance_class
4171 .as_deref(),
4172 Some("db.t3.small"),
4173 );
4174 }
4175
4176 fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4179 let mut __a = svc.state.write();
4180 let state = __a.default_mut();
4181 let arn = state.db_snapshot_arn(snapshot_id);
4182 state.snapshots.insert(
4183 snapshot_id.to_string(),
4184 crate::state::DbSnapshot {
4185 db_snapshot_identifier: snapshot_id.to_string(),
4186 db_snapshot_arn: arn,
4187 db_instance_identifier: instance_id.to_string(),
4188 snapshot_create_time: Utc::now(),
4189 engine: "postgres".to_string(),
4190 engine_version: "16.3".to_string(),
4191 allocated_storage: 20,
4192 status: "available".to_string(),
4193 port: 5432,
4194 master_username: "admin".to_string(),
4195 db_name: Some("appdb".to_string()),
4196 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4197 snapshot_type: "manual".to_string(),
4198 master_user_password: "secret".to_string(),
4199 tags: Vec::new(),
4200 dump_data: Vec::new(),
4201 },
4202 );
4203 }
4204
4205 #[test]
4206 fn delete_db_snapshot_removes_entry() {
4207 let svc = make_service();
4208 seed_snapshot(&svc, "snap1", "db1");
4209 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4210 svc.delete_db_snapshot(&req).unwrap();
4211 assert!(svc.state.read().default_ref().snapshots.is_empty());
4212 }
4213
4214 #[test]
4215 fn delete_db_snapshot_unknown_errors() {
4216 let svc = make_service();
4217 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4218 assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4219 }
4220
4221 #[test]
4222 fn describe_db_snapshots_rejects_both_filters() {
4223 let svc = make_service();
4224 let req = request(
4225 "DescribeDBSnapshots",
4226 &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4227 );
4228 assert_code(
4229 svc.describe_db_snapshots(&req),
4230 "InvalidParameterCombination",
4231 );
4232 }
4233
4234 #[test]
4235 fn describe_db_snapshots_by_id_or_instance() {
4236 let svc = make_service();
4237 seed_snapshot(&svc, "snap1", "db1");
4238 seed_snapshot(&svc, "snap2", "db2");
4239
4240 let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4241 let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4242 assert!(body.contains("snap1"));
4243 assert!(!body.contains("snap2"));
4244
4245 let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4246 let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4247 assert!(body.contains("snap2"));
4248 assert!(!body.contains("snap1"));
4249
4250 let list_all = request("DescribeDBSnapshots", &[]);
4251 let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4252 assert!(body.contains("snap1"));
4253 assert!(body.contains("snap2"));
4254 }
4255
4256 #[test]
4257 fn describe_db_snapshots_unknown_id_errors() {
4258 let svc = make_service();
4259 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4260 assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4261 }
4262
4263 #[test]
4266 fn describe_db_instances_not_found() {
4267 let svc = make_service();
4268 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4269 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4270 }
4271
4272 #[tokio::test]
4273 async fn delete_db_instance_not_found() {
4274 let svc = make_service();
4275 let req = request(
4276 "DeleteDBInstance",
4277 &[
4278 ("DBInstanceIdentifier", "ghost"),
4279 ("SkipFinalSnapshot", "true"),
4280 ],
4281 );
4282 assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4283 }
4284
4285 #[test]
4286 fn modify_db_instance_not_found() {
4287 let svc = make_service();
4288 let req = request(
4289 "ModifyDBInstance",
4290 &[
4291 ("DBInstanceIdentifier", "ghost"),
4292 ("AllocatedStorage", "20"),
4293 ],
4294 );
4295 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4297 }
4298
4299 #[tokio::test]
4300 async fn reboot_db_instance_not_found() {
4301 let svc = make_service();
4302 let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4303 assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4304 }
4305
4306 #[tokio::test]
4307 async fn create_db_snapshot_instance_not_found() {
4308 let svc = make_service();
4309 let req = request(
4310 "CreateDBSnapshot",
4311 &[
4312 ("DBInstanceIdentifier", "ghost"),
4313 ("DBSnapshotIdentifier", "snap1"),
4314 ],
4315 );
4316 assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4317 }
4318
4319 #[tokio::test]
4320 async fn restore_db_instance_snapshot_not_found() {
4321 let svc = make_service();
4322 let req = request(
4323 "RestoreDBInstanceFromDBSnapshot",
4324 &[
4325 ("DBInstanceIdentifier", "restored"),
4326 ("DBSnapshotIdentifier", "ghost-snap"),
4327 ],
4328 );
4329 assert_code(
4330 svc.restore_db_instance_from_db_snapshot(&req).await,
4331 "InvalidParameterValue",
4332 );
4333 }
4334
4335 #[tokio::test]
4336 async fn create_db_instance_read_replica_source_not_found() {
4337 let svc = make_service();
4338 let req = request(
4339 "CreateDBInstanceReadReplica",
4340 &[
4341 ("DBInstanceIdentifier", "replica"),
4342 ("SourceDBInstanceIdentifier", "ghost"),
4343 ],
4344 );
4345 assert_code(
4346 svc.create_db_instance_read_replica(&req).await,
4347 "InvalidParameterValue",
4348 );
4349 }
4350
4351 #[test]
4352 fn describe_db_engine_versions_basic() {
4353 let svc = make_service();
4354 let req = request("DescribeDBEngineVersions", &[]);
4355 let resp = svc.describe_db_engine_versions(&req).unwrap();
4356 let body = body_of(resp);
4357 assert!(body.contains("<DBEngineVersions>"));
4358 }
4359
4360 #[test]
4361 fn describe_orderable_db_instance_options_basic() {
4362 let svc = make_service();
4363 let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4364 let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4365 let body = body_of(resp);
4366 assert!(body.contains("<OrderableDBInstanceOptions>"));
4367 }
4368
4369 #[test]
4370 fn describe_db_parameter_group_not_found() {
4371 let svc = make_service();
4372 let req = request(
4373 "DescribeDBParameterGroups",
4374 &[("DBParameterGroupName", "ghost")],
4375 );
4376 assert_code(
4377 svc.describe_db_parameter_groups(&req),
4378 "DBParameterGroupNotFound",
4379 );
4380 }
4381
4382 #[test]
4383 fn delete_db_parameter_group_not_found() {
4384 let svc = make_service();
4385 let req = request(
4386 "DeleteDBParameterGroup",
4387 &[("DBParameterGroupName", "ghost")],
4388 );
4389 assert_code(
4390 svc.delete_db_parameter_group(&req),
4391 "DBParameterGroupNotFound",
4392 );
4393 }
4394
4395 #[test]
4396 fn describe_db_subnet_group_not_found() {
4397 let svc = make_service();
4398 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4399 assert_code(
4400 svc.describe_db_subnet_groups(&req),
4401 "DBSubnetGroupNotFoundFault",
4402 );
4403 }
4404
4405 #[test]
4406 fn delete_db_subnet_group_not_found() {
4407 let svc = make_service();
4408 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4409 assert_code(
4410 svc.delete_db_subnet_group(&req),
4411 "DBSubnetGroupNotFoundFault",
4412 );
4413 }
4414
4415 #[test]
4416 fn add_tags_resource_not_found() {
4417 let svc = make_service();
4418 let req = request(
4419 "AddTagsToResource",
4420 &[
4421 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4422 ("Tags.member.1.Key", "k"),
4423 ("Tags.member.1.Value", "v"),
4424 ],
4425 );
4426 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4427 }
4428
4429 #[test]
4430 fn list_tags_resource_not_found() {
4431 let svc = make_service();
4432 let req = request(
4433 "ListTagsForResource",
4434 &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4435 );
4436 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4437 }
4438
4439 #[tokio::test]
4442 async fn create_db_snapshot_missing_id_errors() {
4443 let svc = make_service();
4444 let req = request(
4445 "CreateDBSnapshot",
4446 &[("DBInstanceIdentifier", "nonexistent")],
4447 );
4448 assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4449 }
4450
4451 #[tokio::test]
4452 async fn create_db_snapshot_unknown_instance_errors() {
4453 let svc = make_service();
4454 let req = request(
4455 "CreateDBSnapshot",
4456 &[
4457 ("DBSnapshotIdentifier", "snap1"),
4458 ("DBInstanceIdentifier", "ghost"),
4459 ],
4460 );
4461 assert!(svc.create_db_snapshot(&req).await.is_err());
4462 }
4463
4464 #[tokio::test]
4467 async fn delete_db_instance_missing_id_errors() {
4468 let svc = make_service();
4469 let req = request("DeleteDBInstance", &[]);
4470 assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4471 }
4472
4473 #[tokio::test]
4476 async fn reboot_db_instance_missing_id_errors() {
4477 let svc = make_service();
4478 let req = request("RebootDBInstance", &[]);
4479 assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4480 }
4481
4482 #[tokio::test]
4485 async fn create_db_instance_missing_id_errors() {
4486 let svc = make_service();
4487 let req = request(
4488 "CreateDBInstance",
4489 &[
4490 ("Engine", "postgres"),
4491 ("DBInstanceClass", "db.t3.micro"),
4492 ("AllocatedStorage", "20"),
4493 ("MasterUsername", "admin"),
4494 ("MasterUserPassword", "secretpass"),
4495 ],
4496 );
4497 assert!(svc.create_db_instance(&req).await.is_err());
4498 }
4499
4500 #[tokio::test]
4501 async fn create_db_instance_unsupported_engine_errors() {
4502 let svc = make_service();
4503 let req = request(
4504 "CreateDBInstance",
4505 &[
4506 ("DBInstanceIdentifier", "db1"),
4507 ("Engine", "mongodb"),
4508 ("DBInstanceClass", "db.t3.micro"),
4509 ("AllocatedStorage", "20"),
4510 ("MasterUsername", "admin"),
4511 ("MasterUserPassword", "secretpass"),
4512 ],
4513 );
4514 assert!(svc.create_db_instance(&req).await.is_err());
4515 }
4516
4517 #[tokio::test]
4520 async fn restore_db_instance_missing_ids_errors() {
4521 let svc = make_service();
4522 let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4523 assert!(svc
4524 .restore_db_instance_from_db_snapshot(&req)
4525 .await
4526 .is_err());
4527 }
4528
4529 #[tokio::test]
4530 async fn restore_db_instance_unknown_snapshot_errors() {
4531 let svc = make_service();
4532 let req = request(
4533 "RestoreDBInstanceFromDBSnapshot",
4534 &[
4535 ("DBInstanceIdentifier", "restored"),
4536 ("DBSnapshotIdentifier", "missing"),
4537 ],
4538 );
4539 assert!(svc
4540 .restore_db_instance_from_db_snapshot(&req)
4541 .await
4542 .is_err());
4543 }
4544
4545 #[tokio::test]
4548 async fn create_read_replica_missing_source_errors() {
4549 let svc = make_service();
4550 let req = request(
4551 "CreateDBInstanceReadReplica",
4552 &[("DBInstanceIdentifier", "replica1")],
4553 );
4554 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4555 }
4556
4557 #[tokio::test]
4558 async fn create_read_replica_unknown_source_errors() {
4559 let svc = make_service();
4560 let req = request(
4561 "CreateDBInstanceReadReplica",
4562 &[
4563 ("DBInstanceIdentifier", "replica1"),
4564 ("SourceDBInstanceIdentifier", "ghost"),
4565 ],
4566 );
4567 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4568 }
4569
4570 #[test]
4573 fn describe_db_snapshots_by_snapshot_id_only() {
4574 let svc = make_service();
4575 seed_snapshot(&svc, "s1", "inst1");
4576 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4577 let resp = svc.describe_db_snapshots(&req).unwrap();
4578 let b = body_of(resp);
4579 assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4580 }
4581
4582 #[test]
4583 fn describe_db_snapshots_by_instance_id_returns_matching() {
4584 let svc = make_service();
4585 seed_snapshot(&svc, "s1", "inst1");
4586 seed_snapshot(&svc, "s2", "inst2");
4587 let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4588 let resp = svc.describe_db_snapshots(&req).unwrap();
4589 let b = body_of(resp);
4590 assert!(b.contains("s1"));
4591 assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4592 }
4593
4594 #[test]
4597 fn modify_db_parameter_group_missing_name() {
4598 let svc = make_service();
4599 let req = request("ModifyDBParameterGroup", &[]);
4600 assert!(svc.modify_db_parameter_group(&req).is_err());
4601 }
4602
4603 #[test]
4606 fn modify_db_subnet_group_unknown_errors() {
4607 let svc = make_service();
4608 let req = request(
4609 "ModifyDBSubnetGroup",
4610 &[
4611 ("DBSubnetGroupName", "ghost"),
4612 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4613 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4614 ],
4615 );
4616 assert!(svc.modify_db_subnet_group(&req).is_err());
4617 }
4618
4619 #[test]
4622 fn describe_db_instances_empty_returns_xml() {
4623 let svc = make_service();
4624 let req = request("DescribeDBInstances", &[]);
4625 let resp = svc.describe_db_instances(&req).unwrap();
4626 let b = body_of(resp);
4627 assert!(b.contains("DescribeDBInstancesResult"));
4628 }
4629
4630 #[test]
4631 fn describe_db_snapshots_empty_returns_empty_list() {
4632 let svc = make_service();
4633 let req = request("DescribeDBSnapshots", &[]);
4634 let resp = svc.describe_db_snapshots(&req).unwrap();
4635 let b = body_of(resp);
4636 assert!(b.contains("DescribeDBSnapshotsResult"));
4637 }
4638
4639 #[test]
4640 fn add_tags_unknown_resource_errors() {
4641 let svc = make_service();
4642 let req = request(
4643 "AddTagsToResource",
4644 &[
4645 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4646 ("Tags.member.1.Key", "k"),
4647 ("Tags.member.1.Value", "v"),
4648 ],
4649 );
4650 assert!(svc.add_tags_to_resource(&req).is_err());
4651 }
4652
4653 #[test]
4654 fn remove_tags_unknown_resource_errors() {
4655 let svc = make_service();
4656 let req = request(
4657 "RemoveTagsFromResource",
4658 &[
4659 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4660 ("TagKeys.member.1", "k"),
4661 ],
4662 );
4663 assert!(svc.remove_tags_from_resource(&req).is_err());
4664 }
4665
4666 #[test]
4667 fn create_db_parameter_group_missing_name_errors() {
4668 let svc = make_service();
4669 let req = request(
4670 "CreateDBParameterGroup",
4671 &[
4672 ("DBParameterGroupFamily", "postgres16"),
4673 ("Description", "d"),
4674 ],
4675 );
4676 assert!(svc.create_db_parameter_group(&req).is_err());
4677 }
4678
4679 #[test]
4680 fn create_db_subnet_group_missing_desc_errors() {
4681 let svc = make_service();
4682 let req = request(
4683 "CreateDBSubnetGroup",
4684 &[
4685 ("DBSubnetGroupName", "sg1"),
4686 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4687 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4688 ],
4689 );
4690 assert!(svc.create_db_subnet_group(&req).is_err());
4691 }
4692
4693 #[tokio::test]
4694 async fn create_db_instance_missing_class_errors() {
4695 let svc = make_service();
4696 let req = request(
4697 "CreateDBInstance",
4698 &[
4699 ("DBInstanceIdentifier", "miss-class"),
4700 ("Engine", "postgres"),
4701 ("AllocatedStorage", "20"),
4702 ("MasterUsername", "admin"),
4703 ("MasterUserPassword", "secretpass"),
4704 ],
4705 );
4706 assert!(svc.create_db_instance(&req).await.is_err());
4707 }
4708
4709 #[tokio::test]
4710 async fn create_db_instance_missing_master_username_errors() {
4711 let svc = make_service();
4712 let req = request(
4713 "CreateDBInstance",
4714 &[
4715 ("DBInstanceIdentifier", "miss-mu"),
4716 ("Engine", "postgres"),
4717 ("DBInstanceClass", "db.t3.micro"),
4718 ("AllocatedStorage", "20"),
4719 ("MasterUserPassword", "secretpass"),
4720 ],
4721 );
4722 assert!(svc.create_db_instance(&req).await.is_err());
4723 }
4724
4725 #[test]
4726 fn modify_db_instance_missing_id_errors() {
4727 let svc = make_service();
4728 let req = request("ModifyDBInstance", &[]);
4729 assert!(svc.modify_db_instance(&req).is_err());
4730 }
4731
4732 #[test]
4733 fn modify_db_parameter_group_unknown_pg_errors() {
4734 let svc = make_service();
4735 let req = request(
4736 "ModifyDBParameterGroup",
4737 &[
4738 ("DBParameterGroupName", "ghost"),
4739 ("Parameters.member.1.ParameterName", "p"),
4740 ("Parameters.member.1.ParameterValue", "v"),
4741 ("Parameters.member.1.ApplyMethod", "immediate"),
4742 ],
4743 );
4744 assert!(svc.modify_db_parameter_group(&req).is_err());
4745 }
4746
4747 #[test]
4748 fn describe_db_parameter_groups_unknown_errors() {
4749 let svc = make_service();
4750 let req = request(
4751 "DescribeDBParameterGroups",
4752 &[("DBParameterGroupName", "ghost")],
4753 );
4754 assert!(svc.describe_db_parameter_groups(&req).is_err());
4755 }
4756
4757 #[test]
4758 fn describe_db_subnet_groups_unknown_errors() {
4759 let svc = make_service();
4760 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4761 assert!(svc.describe_db_subnet_groups(&req).is_err());
4762 }
4763}