1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::runtime::{RdsRuntime, RuntimeError};
16use crate::state::{
17 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
18 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
19 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
23
24fn is_mutating_action(action: &str) -> bool {
25 if matches!(
26 action,
27 "AddTagsToResource"
28 | "CreateDBInstance"
29 | "CreateDBInstanceReadReplica"
30 | "CreateDBParameterGroup"
31 | "CreateDBSnapshot"
32 | "CreateDBSubnetGroup"
33 | "DeleteDBInstance"
34 | "DeleteDBParameterGroup"
35 | "DeleteDBSnapshot"
36 | "DeleteDBSubnetGroup"
37 | "ModifyDBInstance"
38 | "ModifyDBParameterGroup"
39 | "ModifyDBSubnetGroup"
40 | "RebootDBInstance"
41 | "RemoveTagsFromResource"
42 | "RestoreDBInstanceFromDBSnapshot"
43 ) {
44 return true;
45 }
46 let mutating_prefixes = [
48 "Create",
49 "Modify",
50 "Delete",
51 "Reboot",
52 "Start",
53 "Stop",
54 "Failover",
55 "Switchover",
56 "Promote",
57 "Reset",
58 "Apply",
59 "Authorize",
60 "Revoke",
61 "Add",
62 "Remove",
63 "Register",
64 "Deregister",
65 "Copy",
66 "Restore",
67 "Backtrack",
68 "Cancel",
69 "Purchase",
70 "Disable",
71 "Enable",
72 ];
73 mutating_prefixes.iter().any(|p| action.starts_with(p))
74}
75const SUPPORTED_ACTIONS: &[&str] = &[
76 "AddRoleToDBCluster",
77 "AddRoleToDBInstance",
78 "AddSourceIdentifierToSubscription",
79 "AddTagsToResource",
80 "ApplyPendingMaintenanceAction",
81 "AuthorizeDBSecurityGroupIngress",
82 "BacktrackDBCluster",
83 "CancelExportTask",
84 "CopyDBClusterParameterGroup",
85 "CopyDBClusterSnapshot",
86 "CopyDBParameterGroup",
87 "CopyDBSnapshot",
88 "CopyOptionGroup",
89 "CreateBlueGreenDeployment",
90 "CreateCustomDBEngineVersion",
91 "CreateDBCluster",
92 "CreateDBClusterEndpoint",
93 "CreateDBClusterParameterGroup",
94 "CreateDBClusterSnapshot",
95 "CreateDBInstance",
96 "CreateDBInstanceReadReplica",
97 "CreateDBParameterGroup",
98 "CreateDBProxy",
99 "CreateDBProxyEndpoint",
100 "CreateDBSecurityGroup",
101 "CreateDBShardGroup",
102 "CreateDBSnapshot",
103 "CreateDBSubnetGroup",
104 "CreateEventSubscription",
105 "CreateGlobalCluster",
106 "CreateIntegration",
107 "CreateOptionGroup",
108 "CreateTenantDatabase",
109 "DeleteBlueGreenDeployment",
110 "DeleteCustomDBEngineVersion",
111 "DeleteDBCluster",
112 "DeleteDBClusterAutomatedBackup",
113 "DeleteDBClusterEndpoint",
114 "DeleteDBClusterParameterGroup",
115 "DeleteDBClusterSnapshot",
116 "DeleteDBInstance",
117 "DeleteDBInstanceAutomatedBackup",
118 "DeleteDBParameterGroup",
119 "DeleteDBProxy",
120 "DeleteDBProxyEndpoint",
121 "DeleteDBSecurityGroup",
122 "DeleteDBShardGroup",
123 "DeleteDBSnapshot",
124 "DeleteDBSubnetGroup",
125 "DeleteEventSubscription",
126 "DeleteGlobalCluster",
127 "DeleteIntegration",
128 "DeleteOptionGroup",
129 "DeleteTenantDatabase",
130 "DeregisterDBProxyTargets",
131 "DescribeAccountAttributes",
132 "DescribeBlueGreenDeployments",
133 "DescribeCertificates",
134 "DescribeDBClusterAutomatedBackups",
135 "DescribeDBClusterBacktracks",
136 "DescribeDBClusterEndpoints",
137 "DescribeDBClusterParameterGroups",
138 "DescribeDBClusterParameters",
139 "DescribeDBClusterSnapshotAttributes",
140 "DescribeDBClusterSnapshots",
141 "DescribeDBClusters",
142 "DescribeDBEngineVersions",
143 "DescribeDBInstanceAutomatedBackups",
144 "DescribeDBInstances",
145 "DescribeDBLogFiles",
146 "DescribeDBMajorEngineVersions",
147 "DescribeDBParameterGroups",
148 "DescribeDBParameters",
149 "DescribeDBProxies",
150 "DescribeDBProxyEndpoints",
151 "DescribeDBProxyTargetGroups",
152 "DescribeDBProxyTargets",
153 "DescribeDBRecommendations",
154 "DescribeDBSecurityGroups",
155 "DescribeDBShardGroups",
156 "DescribeDBSnapshotAttributes",
157 "DescribeDBSnapshotTenantDatabases",
158 "DescribeDBSnapshots",
159 "DescribeDBSubnetGroups",
160 "DescribeEngineDefaultClusterParameters",
161 "DescribeEngineDefaultParameters",
162 "DescribeEventCategories",
163 "DescribeEventSubscriptions",
164 "DescribeEvents",
165 "DescribeExportTasks",
166 "DescribeGlobalClusters",
167 "DescribeIntegrations",
168 "DescribeOptionGroupOptions",
169 "DescribeOptionGroups",
170 "DescribeOrderableDBInstanceOptions",
171 "DescribePendingMaintenanceActions",
172 "DescribeReservedDBInstances",
173 "DescribeReservedDBInstancesOfferings",
174 "DescribeSourceRegions",
175 "DescribeTenantDatabases",
176 "DescribeValidDBInstanceModifications",
177 "DisableHttpEndpoint",
178 "DownloadDBLogFilePortion",
179 "EnableHttpEndpoint",
180 "FailoverDBCluster",
181 "FailoverGlobalCluster",
182 "ListTagsForResource",
183 "ModifyActivityStream",
184 "ModifyCertificates",
185 "ModifyCurrentDBClusterCapacity",
186 "ModifyCustomDBEngineVersion",
187 "ModifyDBCluster",
188 "ModifyDBClusterEndpoint",
189 "ModifyDBClusterParameterGroup",
190 "ModifyDBClusterSnapshotAttribute",
191 "ModifyDBInstance",
192 "ModifyDBParameterGroup",
193 "ModifyDBProxy",
194 "ModifyDBProxyEndpoint",
195 "ModifyDBProxyTargetGroup",
196 "ModifyDBRecommendation",
197 "ModifyDBShardGroup",
198 "ModifyDBSnapshot",
199 "ModifyDBSnapshotAttribute",
200 "ModifyDBSubnetGroup",
201 "ModifyEventSubscription",
202 "ModifyGlobalCluster",
203 "ModifyIntegration",
204 "ModifyOptionGroup",
205 "ModifyTenantDatabase",
206 "PromoteReadReplica",
207 "PromoteReadReplicaDBCluster",
208 "PurchaseReservedDBInstancesOffering",
209 "RebootDBCluster",
210 "RebootDBInstance",
211 "RebootDBShardGroup",
212 "RegisterDBProxyTargets",
213 "RemoveFromGlobalCluster",
214 "RemoveRoleFromDBCluster",
215 "RemoveRoleFromDBInstance",
216 "RemoveSourceIdentifierFromSubscription",
217 "RemoveTagsFromResource",
218 "ResetDBClusterParameterGroup",
219 "ResetDBParameterGroup",
220 "RestoreDBClusterFromS3",
221 "RestoreDBClusterFromSnapshot",
222 "RestoreDBClusterToPointInTime",
223 "RestoreDBInstanceFromDBSnapshot",
224 "RestoreDBInstanceFromS3",
225 "RestoreDBInstanceToPointInTime",
226 "RevokeDBSecurityGroupIngress",
227 "StartActivityStream",
228 "StartDBCluster",
229 "StartDBInstance",
230 "StartDBInstanceAutomatedBackupsReplication",
231 "StartExportTask",
232 "StopActivityStream",
233 "StopDBCluster",
234 "StopDBInstance",
235 "StopDBInstanceAutomatedBackupsReplication",
236 "SwitchoverBlueGreenDeployment",
237 "SwitchoverGlobalCluster",
238 "SwitchoverReadReplica",
239];
240
241pub struct RdsService {
242 pub(crate) state: SharedRdsState,
243 runtime: Option<Arc<RdsRuntime>>,
244 snapshot_store: Option<Arc<dyn SnapshotStore>>,
245 snapshot_lock: Arc<AsyncMutex<()>>,
246 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
247}
248
249#[derive(Clone, Copy)]
251#[allow(dead_code, clippy::enum_variant_names)]
252pub(crate) enum RdsSourceType {
253 DbInstance,
254 DbSnapshot,
255 DbParameterGroup,
256}
257
258impl RdsSourceType {
259 fn as_str(self) -> &'static str {
260 match self {
261 Self::DbInstance => "DB_INSTANCE",
262 Self::DbSnapshot => "DB_SNAPSHOT",
263 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
264 }
265 }
266
267 fn detail_type(self) -> &'static str {
268 match self {
269 Self::DbInstance => "RDS DB Instance Event",
270 Self::DbSnapshot => "RDS DB Snapshot Event",
271 Self::DbParameterGroup => "RDS DB Parameter Group Event",
272 }
273 }
274}
275
276impl RdsService {
277 pub(crate) fn state_handle(&self) -> &SharedRdsState {
278 &self.state
279 }
280}
281
282impl RdsService {
283 pub fn new(state: SharedRdsState) -> Self {
284 Self {
285 state,
286 runtime: None,
287 snapshot_store: None,
288 snapshot_lock: Arc::new(AsyncMutex::new(())),
289 delivery_bus: None,
290 }
291 }
292
293 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
294 self.runtime = Some(runtime);
295 self
296 }
297
298 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
299 self.snapshot_store = Some(store);
300 self
301 }
302
303 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
304 self.delivery_bus = Some(bus);
305 self
306 }
307
308 pub(crate) fn emit_event(
311 &self,
312 source_type: RdsSourceType,
313 source_identifier: &str,
314 source_arn: &str,
315 event_id: &str,
316 event_categories: &[&str],
317 message: &str,
318 ) {
319 emit_event_static(
320 self.delivery_bus.as_ref(),
321 source_type,
322 source_identifier,
323 source_arn,
324 event_id,
325 event_categories,
326 message,
327 );
328 }
329
330 async fn save_snapshot(&self) {
331 let Some(store) = self.snapshot_store.clone() else {
332 return;
333 };
334 let _guard = self.snapshot_lock.lock().await;
335 let snapshot = RdsSnapshot {
336 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
337 state: None,
338 accounts: Some(self.state.read().clone()),
339 };
340 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
341 let bytes = serde_json::to_vec(&snapshot)
342 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
343 store.save(&bytes)
344 })
345 .await;
346 match join {
347 Ok(Ok(())) => {}
348 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
349 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
350 }
351 }
352
353 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
359 self.runtime.as_ref().ok_or_else(|| {
360 AwsServiceError::aws_error(
361 StatusCode::SERVICE_UNAVAILABLE,
362 "InvalidParameterValue",
363 "Docker/Podman is required for RDS DB instances but is not available",
364 )
365 })
366 }
367}
368
369#[async_trait]
370impl AwsService for RdsService {
371 fn service_name(&self) -> &str {
372 "rds"
373 }
374
375 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
376 let mutates = is_mutating_action(request.action.as_str());
377 let result = match request.action.as_str() {
378 "AddTagsToResource" => self.add_tags_to_resource(&request),
379 "CreateDBInstance" => self.create_db_instance(&request).await,
380 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
381 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
382 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
383 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
384 "DeleteDBInstance" => self.delete_db_instance(&request).await,
385 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
386 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
387 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
388 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
389 "DescribeDBInstances" => self.describe_db_instances(&request),
390 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
391 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
392 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
393 "DescribeOrderableDBInstanceOptions" => {
394 self.describe_orderable_db_instance_options(&request)
395 }
396 "ListTagsForResource" => self.list_tags_for_resource(&request),
397 "ModifyDBInstance" => self.modify_db_instance(&request),
398 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
399 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
400 "RebootDBInstance" => self.reboot_db_instance(&request).await,
401 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
402 "RestoreDBInstanceFromDBSnapshot" => {
403 self.restore_db_instance_from_db_snapshot(&request).await
404 }
405 _ => self.handle_extra_action(&request),
406 };
407 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
408 self.save_snapshot().await;
409 }
410 result
411 }
412
413 fn supported_actions(&self) -> &[&str] {
414 SUPPORTED_ACTIONS
415 }
416}
417
418impl RdsService {
419 async fn create_db_instance(
420 &self,
421 request: &AwsRequest,
422 ) -> Result<AwsResponse, AwsServiceError> {
423 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
424 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
425 let db_instance_class = required_param(request, "DBInstanceClass")?;
426 let engine = required_param(request, "Engine")?;
427 let master_username = required_param(request, "MasterUsername")?;
428 let master_user_password = required_param(request, "MasterUserPassword")?;
429 let db_name = optional_param(request, "DBName");
430 let engine_version =
431 optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
432 let publicly_accessible =
433 parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
434 .unwrap_or(true);
435 let deletion_protection =
436 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
437 .unwrap_or(false);
438 let port = optional_i32_param(request, "Port")?
439 .unwrap_or_else(|| default_port_for_engine(&engine));
440 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
441
442 let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
443 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
444
445 let backup_retention_period =
446 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
447 let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
448 .unwrap_or_else(|| "03:00-04:00".to_string());
449 let option_group_name = optional_param(request, "OptionGroupName");
450 let multi_az =
451 parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
452
453 validate_create_request(
454 &db_instance_identifier,
455 allocated_storage,
456 &db_instance_class,
457 &engine,
458 &engine_version,
459 port,
460 )?;
461
462 {
463 let mut accounts = self.state.write();
464 let state = accounts.get_or_create(&request.account_id);
465 if !state.begin_instance_creation(&db_instance_identifier) {
466 return Err(AwsServiceError::aws_error(
467 StatusCode::BAD_REQUEST,
468 "DBInstanceAlreadyExists",
469 format!("DBInstance {} already exists.", db_instance_identifier),
470 ));
471 }
472 if let Some(ref pg_name) = db_parameter_group_name {
474 if !state.parameter_groups.contains_key(pg_name) {
475 state.cancel_instance_creation(&db_instance_identifier);
476 return Err(AwsServiceError::aws_error(
477 StatusCode::NOT_FOUND,
478 "DBParameterGroupNotFound",
479 format!("DBParameterGroup {} not found.", pg_name),
480 ));
481 }
482 }
483 }
484
485 let runtime = self.require_runtime()?.clone();
486
487 let logical_db_name = db_name
488 .clone()
489 .unwrap_or_else(|| default_db_name(&engine).to_string());
490
491 let created_at = Utc::now();
497 let instance = {
498 let mut accounts = self.state.write();
499 let state = accounts.get_or_create(&request.account_id);
500 let placeholder = DbInstance {
501 db_instance_identifier: db_instance_identifier.clone(),
502 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
503 db_instance_class: db_instance_class.clone(),
504 engine: engine.clone(),
505 engine_version: engine_version.clone(),
506 db_instance_status: "creating".to_string(),
507 master_username: master_username.clone(),
508 db_name: db_name.clone(),
509 endpoint_address: String::new(),
510 port: 0,
511 allocated_storage,
512 publicly_accessible,
513 deletion_protection,
514 created_at,
515 dbi_resource_id: state.next_dbi_resource_id(),
516 master_user_password: master_user_password.clone(),
517 container_id: String::new(),
518 host_port: 0,
519 tags: Vec::new(),
520 read_replica_source_db_instance_identifier: None,
521 read_replica_db_instance_identifiers: Vec::new(),
522 vpc_security_group_ids,
523 db_parameter_group_name,
524 backup_retention_period,
525 preferred_backup_window,
526 latest_restorable_time: if backup_retention_period > 0 {
527 Some(created_at)
528 } else {
529 None
530 },
531 option_group_name,
532 multi_az,
533 pending_modified_values: None,
534 };
535 state.finish_instance_creation(placeholder.clone());
536 placeholder
537 };
538 let instance_arn = instance.db_instance_arn.clone();
539
540 self.emit_event(
541 RdsSourceType::DbInstance,
542 &db_instance_identifier,
543 &instance_arn,
544 "RDS-EVENT-0005",
545 &["creation"],
546 "DB instance created",
547 );
548
549 {
550 let state_handle = self.state.clone();
551 let delivery_bus = self.delivery_bus.clone();
552 let runtime = runtime.clone();
553 let id = db_instance_identifier.clone();
554 let engine = engine.clone();
555 let engine_version = engine_version.clone();
556 let master_username = master_username.clone();
557 let master_user_password = master_user_password.clone();
558 let account_id = request.account_id.clone();
559 let region = request.region.clone();
560 let arn = instance_arn.clone();
561 tokio::spawn(async move {
562 match runtime
563 .ensure_postgres(
564 &id,
565 &engine,
566 &engine_version,
567 &master_username,
568 &master_user_password,
569 &logical_db_name,
570 &account_id,
571 ®ion,
572 )
573 .await
574 {
575 Ok(running) => {
576 let mut accounts = state_handle.write();
577 let state = accounts.get_or_create(&account_id);
578 if let Some(inst) = state.instances.get_mut(&id) {
579 inst.db_instance_status = "available".to_string();
580 inst.endpoint_address = "127.0.0.1".to_string();
581 inst.port = i32::from(running.host_port);
582 inst.host_port = running.host_port;
583 inst.container_id = running.container_id;
584 }
585 }
586 Err(error) => {
587 tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
588 {
589 let mut accounts = state_handle.write();
590 let state = accounts.get_or_create(&account_id);
591 state.instances.remove(&id);
592 }
593 emit_event_static(
594 delivery_bus.as_ref(),
595 RdsSourceType::DbInstance,
596 &id,
597 &arn,
598 "RDS-EVENT-0058",
599 &["failure"],
600 &format!("DB instance failed to create: {}", error),
601 );
602 }
603 }
604 });
605 }
606
607 Ok(AwsResponse::xml(
608 StatusCode::OK,
609 xml_wrap(
610 "CreateDBInstance",
611 &format!(
612 "<DBInstance>{}</DBInstance>",
613 db_instance_xml(&instance, None)
614 ),
615 &request.request_id,
616 ),
617 ))
618 }
619
620 async fn delete_db_instance(
621 &self,
622 request: &AwsRequest,
623 ) -> Result<AwsResponse, AwsServiceError> {
624 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
625 let skip_final_snapshot =
626 parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
627 .unwrap_or(false);
628 let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
629
630 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
631 return Err(AwsServiceError::aws_error(
632 StatusCode::BAD_REQUEST,
633 "InvalidParameterCombination",
634 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
635 ));
636 }
637 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
638 return Err(AwsServiceError::aws_error(
639 StatusCode::BAD_REQUEST,
640 "InvalidParameterCombination",
641 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
642 ));
643 }
644
645 {
647 let accounts = self.state.read();
648 let empty = RdsState::new(&request.account_id, &request.region);
649 let state = accounts.get(&request.account_id).unwrap_or(&empty);
650 if let Some(instance) = state.instances.get(&db_instance_identifier) {
651 if instance.deletion_protection {
652 return Err(AwsServiceError::aws_error(
653 StatusCode::BAD_REQUEST,
654 "InvalidDBInstanceState",
655 format!(
656 "DBInstance {} cannot be deleted because deletion protection is enabled.",
657 db_instance_identifier
658 ),
659 ));
660 }
661 } else {
662 return Err(db_instance_not_found(&db_instance_identifier));
663 }
664 }
665
666 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
667 self.create_final_db_snapshot(
668 &db_instance_identifier,
669 snapshot_id,
670 &request.account_id,
671 &request.region,
672 )
673 .await?;
674 }
675
676 let instance = {
677 let mut accounts = self.state.write();
678 let state = accounts.get_or_create(&request.account_id);
679 let instance = state
680 .instances
681 .remove(&db_instance_identifier)
682 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
683
684 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
685 if let Some(source) = state.instances.get_mut(source_id) {
686 source
687 .read_replica_db_instance_identifiers
688 .retain(|id| id != &db_instance_identifier);
689 }
690 }
691
692 for replica_id in &instance.read_replica_db_instance_identifiers {
693 if let Some(replica) = state.instances.get_mut(replica_id) {
694 replica.read_replica_source_db_instance_identifier = None;
695 }
696 }
697
698 instance
699 };
700
701 if let Some(runtime) = &self.runtime {
702 runtime.stop_container(&db_instance_identifier).await;
703 }
704
705 self.emit_event(
706 RdsSourceType::DbInstance,
707 &db_instance_identifier,
708 &instance.db_instance_arn,
709 "RDS-EVENT-0003",
710 &["deletion"],
711 "DB instance deleted",
712 );
713
714 Ok(AwsResponse::xml(
715 StatusCode::OK,
716 xml_wrap(
717 "DeleteDBInstance",
718 &format!(
719 "<DBInstance>{}</DBInstance>",
720 db_instance_xml(&instance, Some("deleting"))
721 ),
722 &request.request_id,
723 ),
724 ))
725 }
726
727 async fn create_final_db_snapshot(
733 &self,
734 db_instance_identifier: &str,
735 snapshot_id: &str,
736 account_id: &str,
737 region: &str,
738 ) -> Result<(), AwsServiceError> {
739 let runtime = self.runtime.as_ref().ok_or_else(|| {
740 AwsServiceError::aws_error(
741 StatusCode::SERVICE_UNAVAILABLE,
742 "InvalidParameterValue",
743 "Docker/Podman is required for RDS snapshots but is not available",
744 )
745 })?;
746
747 let (instance_for_snapshot, db_name) = {
748 let accounts = self.state.read();
749 let empty = RdsState::new(account_id, region);
750 let state = accounts.get(account_id).unwrap_or(&empty);
751
752 if state.snapshots.contains_key(snapshot_id) {
753 return Err(AwsServiceError::aws_error(
754 StatusCode::CONFLICT,
755 "DBSnapshotAlreadyExists",
756 format!("DBSnapshot {snapshot_id} already exists."),
757 ));
758 }
759
760 let instance = state
761 .instances
762 .get(db_instance_identifier)
763 .cloned()
764 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
765
766 let default_db = default_db_name(&instance.engine);
767 let db_name = instance
768 .db_name
769 .as_deref()
770 .unwrap_or(default_db)
771 .to_string();
772
773 (instance, db_name)
774 };
775
776 let dump_data = runtime
777 .dump_database(
778 db_instance_identifier,
779 &instance_for_snapshot.engine,
780 &instance_for_snapshot.master_username,
781 &instance_for_snapshot.master_user_password,
782 &db_name,
783 )
784 .await
785 .map_err(runtime_error_to_service_error)?;
786
787 let mut accounts = self.state.write();
788 let state = accounts.get_or_create(account_id);
789
790 if state.snapshots.contains_key(snapshot_id) {
791 return Err(AwsServiceError::aws_error(
792 StatusCode::CONFLICT,
793 "DBSnapshotAlreadyExists",
794 format!("DBSnapshot {snapshot_id} already exists."),
795 ));
796 }
797
798 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
799
800 let snapshot = DbSnapshot {
801 db_snapshot_identifier: snapshot_id.to_string(),
802 db_snapshot_arn: snapshot_arn,
803 db_instance_identifier: db_instance_identifier.to_string(),
804 snapshot_create_time: Utc::now(),
805 engine: instance_for_snapshot.engine.clone(),
806 engine_version: instance_for_snapshot.engine_version.clone(),
807 allocated_storage: instance_for_snapshot.allocated_storage,
808 status: "available".to_string(),
809 port: instance_for_snapshot.port,
810 master_username: instance_for_snapshot.master_username.clone(),
811 db_name: instance_for_snapshot.db_name.clone(),
812 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
813 snapshot_type: "manual".to_string(),
814 master_user_password: instance_for_snapshot.master_user_password.clone(),
815 tags: Vec::new(),
816 dump_data,
817 };
818
819 state.snapshots.insert(snapshot_id.to_string(), snapshot);
820 Ok(())
821 }
822
823 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
824 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
825 let db_instance_class = optional_param(request, "DBInstanceClass");
826 let deletion_protection =
827 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
828 let apply_immediately =
829 parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
830
831 let vpc_security_group_ids = {
833 let mut ids = Vec::new();
834 for index in 1.. {
835 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
836 match optional_param(request, &sg_id_name) {
837 Some(sg_id) => ids.push(sg_id),
838 None => break,
839 }
840 }
841 if ids.is_empty() {
842 None
843 } else {
844 Some(ids)
845 }
846 };
847
848 if db_instance_class.is_none()
849 && deletion_protection.is_none()
850 && vpc_security_group_ids.is_none()
851 {
852 return Err(AwsServiceError::aws_error(
853 StatusCode::BAD_REQUEST,
854 "InvalidParameterCombination",
855 "At least one supported mutable field must be provided.",
856 ));
857 }
858 if let Some(ref class) = db_instance_class {
859 validate_db_instance_class(class)?;
860 }
861
862 let mut accounts = self.state.write();
863 let state = accounts.get_or_create(&request.account_id);
864 let instance = state
865 .instances
866 .get_mut(&db_instance_identifier)
867 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
868
869 if apply_immediately == Some(false) {
871 let pending = instance
872 .pending_modified_values
873 .get_or_insert(Default::default());
874 if let Some(class) = db_instance_class {
875 pending.db_instance_class = Some(class);
876 }
877 if let Some(deletion_protection) = deletion_protection {
880 instance.deletion_protection = deletion_protection;
881 }
882 if let Some(security_group_ids) = vpc_security_group_ids {
883 instance.vpc_security_group_ids = security_group_ids;
884 }
885 } else {
886 if let Some(class) = db_instance_class {
888 instance.db_instance_class = class;
889 }
890 if let Some(deletion_protection) = deletion_protection {
891 instance.deletion_protection = deletion_protection;
892 }
893 if let Some(security_group_ids) = vpc_security_group_ids {
894 instance.vpc_security_group_ids = security_group_ids;
895 }
896 }
897 let instance_arn = instance.db_instance_arn.clone();
898 let xml = xml_wrap(
899 "ModifyDBInstance",
900 &format!(
901 "<DBInstance>{}</DBInstance>",
902 db_instance_xml(instance, Some("modifying"))
903 ),
904 &request.request_id,
905 );
906 drop(accounts);
907
908 self.emit_event(
909 RdsSourceType::DbInstance,
910 &db_instance_identifier,
911 &instance_arn,
912 "RDS-EVENT-0014",
913 &["configuration change"],
914 "DB instance was modified",
915 );
916
917 Ok(AwsResponse::xml(StatusCode::OK, xml))
918 }
919
920 async fn reboot_db_instance(
921 &self,
922 request: &AwsRequest,
923 ) -> Result<AwsResponse, AwsServiceError> {
924 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
925 let force_failover =
926 parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
927 if force_failover == Some(true) {
928 return Err(AwsServiceError::aws_error(
929 StatusCode::BAD_REQUEST,
930 "InvalidParameterCombination",
931 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
932 ));
933 }
934
935 let instance = {
936 let accounts = self.state.read();
937 let empty = RdsState::new(&request.account_id, &request.region);
938 let state = accounts.get(&request.account_id).unwrap_or(&empty);
939 state
940 .instances
941 .get(&db_instance_identifier)
942 .cloned()
943 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
944 };
945
946 let runtime = self.require_runtime()?;
947
948 let running = runtime
949 .restart_container(
950 &db_instance_identifier,
951 &instance.engine,
952 &instance.master_username,
953 &instance.master_user_password,
954 instance
955 .db_name
956 .as_deref()
957 .unwrap_or(default_db_name(&instance.engine)),
958 )
959 .await
960 .map_err(runtime_error_to_service_error)?;
961
962 let instance = {
963 let mut accounts = self.state.write();
964 let state = accounts.get_or_create(&request.account_id);
965 let instance = state
966 .instances
967 .get_mut(&db_instance_identifier)
968 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
969 instance.host_port = running.host_port;
970 instance.port = i32::from(running.host_port);
971
972 if let Some(pending) = instance.pending_modified_values.take() {
974 if let Some(class) = pending.db_instance_class {
975 instance.db_instance_class = class;
976 }
977 if let Some(allocated_storage) = pending.allocated_storage {
978 instance.allocated_storage = allocated_storage;
979 }
980 if let Some(backup_retention_period) = pending.backup_retention_period {
981 instance.backup_retention_period = backup_retention_period;
982 }
983 if let Some(multi_az) = pending.multi_az {
984 instance.multi_az = multi_az;
985 }
986 if let Some(engine_version) = pending.engine_version {
987 instance.engine_version = engine_version;
988 }
989 if let Some(master_user_password) = pending.master_user_password {
990 instance.master_user_password = master_user_password;
991 }
992 }
993
994 instance.clone()
995 };
996
997 self.emit_event(
998 RdsSourceType::DbInstance,
999 &db_instance_identifier,
1000 &instance.db_instance_arn,
1001 "RDS-EVENT-0006",
1002 &["availability"],
1003 "DB instance restarted",
1004 );
1005
1006 Ok(AwsResponse::xml(
1007 StatusCode::OK,
1008 xml_wrap(
1009 "RebootDBInstance",
1010 &format!(
1011 "<DBInstance>{}</DBInstance>",
1012 db_instance_xml(&instance, Some("rebooting"))
1013 ),
1014 &request.request_id,
1015 ),
1016 ))
1017 }
1018
1019 fn describe_db_engine_versions(
1020 &self,
1021 request: &AwsRequest,
1022 ) -> Result<AwsResponse, AwsServiceError> {
1023 let engine = optional_param(request, "Engine");
1024 let engine_version = optional_param(request, "EngineVersion");
1025 let family = optional_param(request, "DBParameterGroupFamily");
1026 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
1027
1028 let mut versions = filter_engine_versions(
1029 &default_engine_versions(),
1030 &engine,
1031 &engine_version,
1032 &family,
1033 );
1034
1035 if default_only.unwrap_or(false) {
1036 versions.truncate(1);
1037 }
1038
1039 Ok(AwsResponse::xml(
1040 StatusCode::OK,
1041 xml_wrap(
1042 "DescribeDBEngineVersions",
1043 &format!(
1044 "<DBEngineVersions>{}</DBEngineVersions>",
1045 versions.iter().map(engine_version_xml).collect::<String>()
1046 ),
1047 &request.request_id,
1048 ),
1049 ))
1050 }
1051
1052 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1053 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1054 let marker = optional_param(request, "Marker");
1055 let max_records = optional_param(request, "MaxRecords");
1056
1057 let accounts = self.state.read();
1058 let empty = RdsState::new(&request.account_id, &request.region);
1059 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1060
1061 if let Some(identifier) = db_instance_identifier {
1063 let instance = state
1064 .instances
1065 .get(&identifier)
1066 .cloned()
1067 .ok_or_else(|| db_instance_not_found(&identifier))?;
1068
1069 return Ok(AwsResponse::xml(
1070 StatusCode::OK,
1071 xml_wrap(
1072 "DescribeDBInstances",
1073 &format!(
1074 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1075 db_instance_xml(&instance, None)
1076 ),
1077 &request.request_id,
1078 ),
1079 ));
1080 }
1081
1082 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1084 instances.sort_by(|a, b| {
1085 a.created_at
1086 .cmp(&b.created_at)
1087 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1088 });
1089
1090 let paginated = paginate(instances, marker, max_records, |inst| {
1092 &inst.db_instance_identifier
1093 })?;
1094
1095 let marker_xml = paginated
1096 .next_marker
1097 .as_ref()
1098 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1099 .unwrap_or_default();
1100
1101 Ok(AwsResponse::xml(
1102 StatusCode::OK,
1103 xml_wrap(
1104 "DescribeDBInstances",
1105 &format!(
1106 "<DBInstances>{}</DBInstances>{}",
1107 paginated
1108 .items
1109 .iter()
1110 .map(|instance| {
1111 format!(
1112 "<DBInstance>{}</DBInstance>",
1113 db_instance_xml(instance, None)
1114 )
1115 })
1116 .collect::<String>(),
1117 marker_xml
1118 ),
1119 &request.request_id,
1120 ),
1121 ))
1122 }
1123
1124 fn describe_orderable_db_instance_options(
1125 &self,
1126 request: &AwsRequest,
1127 ) -> Result<AwsResponse, AwsServiceError> {
1128 let engine = optional_param(request, "Engine");
1129 let engine_version = optional_param(request, "EngineVersion");
1130 let db_instance_class = optional_param(request, "DBInstanceClass");
1131 let license_model = optional_param(request, "LicenseModel");
1132 let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1133
1134 let options = filter_orderable_options(
1135 &default_orderable_options(),
1136 &engine,
1137 &engine_version,
1138 &db_instance_class,
1139 &license_model,
1140 vpc,
1141 );
1142
1143 Ok(AwsResponse::xml(
1144 StatusCode::OK,
1145 xml_wrap(
1146 "DescribeOrderableDBInstanceOptions",
1147 &format!(
1148 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1149 options.iter().map(orderable_option_xml).collect::<String>()
1150 ),
1151 &request.request_id,
1152 ),
1153 ))
1154 }
1155
1156 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1157 let resource_name = required_param(request, "ResourceName")?;
1158 let tags = parse_tags(request)?;
1159
1160 if tags.is_empty() {
1161 return Err(AwsServiceError::aws_error(
1162 StatusCode::BAD_REQUEST,
1163 "MissingParameter",
1164 "The request must contain the parameter Tags.",
1165 ));
1166 }
1167
1168 let mut accounts = self.state.write();
1169 let state = accounts.get_or_create(&request.account_id);
1170 let instance = find_instance_by_arn_mut(state, &resource_name)?;
1171 merge_tags(&mut instance.tags, &tags);
1172
1173 Ok(AwsResponse::xml(
1174 StatusCode::OK,
1175 xml_wrap("AddTagsToResource", "", &request.request_id),
1176 ))
1177 }
1178
1179 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1180 let resource_name = required_param(request, "ResourceName")?;
1181 if query_param_prefix_exists(request, "Filters.") {
1182 return Err(AwsServiceError::aws_error(
1183 StatusCode::BAD_REQUEST,
1184 "InvalidParameterValue",
1185 "Filters are not yet supported for ListTagsForResource.",
1186 ));
1187 }
1188
1189 let accounts = self.state.read();
1190 let empty = RdsState::new(&request.account_id, &request.region);
1191 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1192 let instance = find_instance_by_arn(state, &resource_name)?;
1193 let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1194
1195 Ok(AwsResponse::xml(
1196 StatusCode::OK,
1197 xml_wrap(
1198 "ListTagsForResource",
1199 &format!("<TagList>{tag_xml}</TagList>"),
1200 &request.request_id,
1201 ),
1202 ))
1203 }
1204
1205 fn remove_tags_from_resource(
1206 &self,
1207 request: &AwsRequest,
1208 ) -> Result<AwsResponse, AwsServiceError> {
1209 let resource_name = required_param(request, "ResourceName")?;
1210 let tag_keys = parse_tag_keys(request)?;
1211
1212 if tag_keys.is_empty() {
1213 return Err(AwsServiceError::aws_error(
1214 StatusCode::BAD_REQUEST,
1215 "MissingParameter",
1216 "The request must contain the parameter TagKeys.",
1217 ));
1218 }
1219
1220 let mut accounts = self.state.write();
1221 let state = accounts.get_or_create(&request.account_id);
1222 let instance = find_instance_by_arn_mut(state, &resource_name)?;
1223 instance
1224 .tags
1225 .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1226
1227 Ok(AwsResponse::xml(
1228 StatusCode::OK,
1229 xml_wrap("RemoveTagsFromResource", "", &request.request_id),
1230 ))
1231 }
1232
1233 async fn create_db_snapshot(
1234 &self,
1235 request: &AwsRequest,
1236 ) -> Result<AwsResponse, AwsServiceError> {
1237 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1238 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1239
1240 let runtime = self.runtime.as_ref().ok_or_else(|| {
1241 AwsServiceError::aws_error(
1242 StatusCode::SERVICE_UNAVAILABLE,
1243 "InvalidParameterValue",
1244 "Docker/Podman is required for RDS snapshots but is not available",
1245 )
1246 })?;
1247
1248 let (instance, db_name) = {
1249 let accounts = self.state.read();
1250 let empty = RdsState::new(&request.account_id, &request.region);
1251 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1252
1253 if state.snapshots.contains_key(&db_snapshot_identifier) {
1254 return Err(AwsServiceError::aws_error(
1255 StatusCode::CONFLICT,
1256 "DBSnapshotAlreadyExists",
1257 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1258 ));
1259 }
1260
1261 let instance = state
1262 .instances
1263 .get(&db_instance_identifier)
1264 .cloned()
1265 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1266
1267 let default_db = default_db_name(&instance.engine);
1268 let db_name = instance
1269 .db_name
1270 .as_deref()
1271 .unwrap_or(default_db)
1272 .to_string();
1273
1274 (instance, db_name)
1275 };
1276
1277 let dump_data = runtime
1278 .dump_database(
1279 &db_instance_identifier,
1280 &instance.engine,
1281 &instance.master_username,
1282 &instance.master_user_password,
1283 &db_name,
1284 )
1285 .await
1286 .map_err(runtime_error_to_service_error)?;
1287
1288 let mut accounts = self.state.write();
1289 let state = accounts.get_or_create(&request.account_id);
1290
1291 if state.snapshots.contains_key(&db_snapshot_identifier) {
1292 return Err(AwsServiceError::aws_error(
1293 StatusCode::CONFLICT,
1294 "DBSnapshotAlreadyExists",
1295 format!("DBSnapshot {db_snapshot_identifier} already exists."),
1296 ));
1297 }
1298
1299 let snapshot = DbSnapshot {
1300 db_snapshot_identifier: db_snapshot_identifier.clone(),
1301 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1302 db_instance_identifier: instance.db_instance_identifier.clone(),
1303 snapshot_create_time: Utc::now(),
1304 engine: instance.engine.clone(),
1305 engine_version: instance.engine_version.clone(),
1306 allocated_storage: instance.allocated_storage,
1307 status: "available".to_string(),
1308 port: instance.port,
1309 master_username: instance.master_username.clone(),
1310 db_name: instance.db_name.clone(),
1311 dbi_resource_id: instance.dbi_resource_id.clone(),
1312 snapshot_type: "manual".to_string(),
1313 master_user_password: instance.master_user_password.clone(),
1314 tags: Vec::new(),
1315 dump_data,
1316 };
1317
1318 state
1319 .snapshots
1320 .insert(db_snapshot_identifier.clone(), snapshot.clone());
1321 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1322 drop(accounts);
1323
1324 self.emit_event(
1325 RdsSourceType::DbSnapshot,
1326 &db_snapshot_identifier,
1327 &snapshot_arn,
1328 "RDS-EVENT-0042",
1329 &["creation"],
1330 "Manual snapshot created",
1331 );
1332
1333 Ok(AwsResponse::xml(
1334 StatusCode::OK,
1335 xml_wrap(
1336 "CreateDBSnapshot",
1337 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1338 &request.request_id,
1339 ),
1340 ))
1341 }
1342
1343 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1344 let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1345 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1346 let marker = optional_param(request, "Marker");
1347 let max_records = optional_param(request, "MaxRecords");
1348
1349 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1350 return Err(AwsServiceError::aws_error(
1351 StatusCode::BAD_REQUEST,
1352 "InvalidParameterCombination",
1353 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1354 ));
1355 }
1356
1357 let accounts = self.state.read();
1358 let empty = RdsState::new(&request.account_id, &request.region);
1359 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1360
1361 if let Some(snapshot_id) = db_snapshot_identifier {
1363 let snapshot = state
1364 .snapshots
1365 .get(&snapshot_id)
1366 .cloned()
1367 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1368
1369 return Ok(AwsResponse::xml(
1370 StatusCode::OK,
1371 xml_wrap(
1372 "DescribeDBSnapshots",
1373 &format!(
1374 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1375 db_snapshot_xml(&snapshot)
1376 ),
1377 &request.request_id,
1378 ),
1379 ));
1380 }
1381
1382 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1384 state
1385 .snapshots
1386 .values()
1387 .filter(|s| s.db_instance_identifier == instance_id)
1388 .cloned()
1389 .collect()
1390 } else {
1391 state.snapshots.values().cloned().collect()
1392 };
1393
1394 snapshots.sort_by(|a, b| {
1396 a.snapshot_create_time
1397 .cmp(&b.snapshot_create_time)
1398 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1399 });
1400
1401 let paginated = paginate(snapshots, marker, max_records, |snap| {
1403 &snap.db_snapshot_identifier
1404 })?;
1405
1406 let marker_xml = paginated
1407 .next_marker
1408 .as_ref()
1409 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1410 .unwrap_or_default();
1411
1412 Ok(AwsResponse::xml(
1413 StatusCode::OK,
1414 xml_wrap(
1415 "DescribeDBSnapshots",
1416 &format!(
1417 "<DBSnapshots>{}</DBSnapshots>{}",
1418 paginated
1419 .items
1420 .iter()
1421 .map(|snapshot| format!(
1422 "<DBSnapshot>{}</DBSnapshot>",
1423 db_snapshot_xml(snapshot)
1424 ))
1425 .collect::<String>(),
1426 marker_xml
1427 ),
1428 &request.request_id,
1429 ),
1430 ))
1431 }
1432
1433 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1434 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1435
1436 let mut accounts = self.state.write();
1437 let state = accounts.get_or_create(&request.account_id);
1438
1439 let snapshot = state
1440 .snapshots
1441 .remove(&db_snapshot_identifier)
1442 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1443 let snapshot_arn = snapshot.db_snapshot_arn.clone();
1444 drop(accounts);
1445
1446 self.emit_event(
1447 RdsSourceType::DbSnapshot,
1448 &db_snapshot_identifier,
1449 &snapshot_arn,
1450 "RDS-EVENT-0041",
1451 &["deletion"],
1452 "Manual snapshot deleted",
1453 );
1454
1455 Ok(AwsResponse::xml(
1456 StatusCode::OK,
1457 xml_wrap(
1458 "DeleteDBSnapshot",
1459 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1460 &request.request_id,
1461 ),
1462 ))
1463 }
1464
1465 async fn restore_db_instance_from_db_snapshot(
1466 &self,
1467 request: &AwsRequest,
1468 ) -> Result<AwsResponse, AwsServiceError> {
1469 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1470 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1471 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1472
1473 let runtime = self.require_runtime()?;
1474
1475 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1476 let mut accounts = self.state.write();
1477 let state = accounts.get_or_create(&request.account_id);
1478
1479 if !state.begin_instance_creation(&db_instance_identifier) {
1480 return Err(AwsServiceError::aws_error(
1481 StatusCode::CONFLICT,
1482 "DBInstanceAlreadyExists",
1483 format!("DBInstance {db_instance_identifier} already exists."),
1484 ));
1485 }
1486
1487 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1488 Some(s) => s,
1489 None => {
1490 state.cancel_instance_creation(&db_instance_identifier);
1491 return Err(db_snapshot_not_found(&db_snapshot_identifier));
1492 }
1493 };
1494
1495 let dbi_resource_id = state.next_dbi_resource_id();
1496 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1497 let created_at = Utc::now();
1498
1499 (snapshot, dbi_resource_id, db_instance_arn, created_at)
1500 };
1501
1502 let db_name = snapshot
1503 .db_name
1504 .as_deref()
1505 .unwrap_or(default_db_name(&snapshot.engine));
1506 let running = match runtime
1507 .ensure_postgres(
1508 &db_instance_identifier,
1509 &snapshot.engine,
1510 &snapshot.engine_version,
1511 &snapshot.master_username,
1512 &snapshot.master_user_password,
1513 db_name,
1514 &request.account_id,
1515 &request.region,
1516 )
1517 .await
1518 {
1519 Ok(running) => running,
1520 Err(e) => {
1521 self.state
1522 .write()
1523 .get_or_create(&request.account_id)
1524 .cancel_instance_creation(&db_instance_identifier);
1525 return Err(runtime_error_to_service_error(e));
1526 }
1527 };
1528
1529 if let Err(e) = runtime
1530 .restore_database(
1531 &db_instance_identifier,
1532 &snapshot.engine,
1533 &snapshot.master_username,
1534 &snapshot.master_user_password,
1535 db_name,
1536 &snapshot.dump_data,
1537 )
1538 .await
1539 {
1540 self.state
1541 .write()
1542 .get_or_create(&request.account_id)
1543 .cancel_instance_creation(&db_instance_identifier);
1544 runtime.stop_container(&db_instance_identifier).await;
1545 return Err(runtime_error_to_service_error(e));
1546 }
1547
1548 let instance = build_restored_instance(
1549 &db_instance_identifier,
1550 db_instance_arn,
1551 dbi_resource_id,
1552 created_at,
1553 vpc_security_group_ids,
1554 &snapshot,
1555 &running,
1556 );
1557
1558 self.state
1559 .write()
1560 .get_or_create(&request.account_id)
1561 .finish_instance_creation(instance.clone());
1562
1563 self.emit_event(
1564 RdsSourceType::DbInstance,
1565 &db_instance_identifier,
1566 &instance.db_instance_arn,
1567 "RDS-EVENT-0043",
1568 &["creation"],
1569 "DB instance restored from snapshot",
1570 );
1571
1572 Ok(AwsResponse::xml(
1573 StatusCode::OK,
1574 xml_wrap(
1575 "RestoreDBInstanceFromDBSnapshot",
1576 &format!(
1577 "<DBInstance>{}</DBInstance>",
1578 db_instance_xml(&instance, None)
1579 ),
1580 &request.request_id,
1581 ),
1582 ))
1583 }
1584
1585 async fn create_db_instance_read_replica(
1586 &self,
1587 request: &AwsRequest,
1588 ) -> Result<AwsResponse, AwsServiceError> {
1589 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1590 let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1591
1592 let runtime = self.runtime.as_ref().ok_or_else(|| {
1593 AwsServiceError::aws_error(
1594 StatusCode::SERVICE_UNAVAILABLE,
1595 "InvalidParameterValue",
1596 "Docker/Podman is required for RDS read replicas but is not available",
1597 )
1598 })?;
1599
1600 let (source_instance, db_name) = {
1601 let mut accounts = self.state.write();
1602 let state = accounts.get_or_create(&request.account_id);
1603
1604 if !state.begin_instance_creation(&db_instance_identifier) {
1605 return Err(AwsServiceError::aws_error(
1606 StatusCode::CONFLICT,
1607 "DBInstanceAlreadyExists",
1608 format!("DBInstance {db_instance_identifier} already exists."),
1609 ));
1610 }
1611
1612 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1613 {
1614 Some(inst) => inst,
1615 None => {
1616 state.cancel_instance_creation(&db_instance_identifier);
1617 return Err(db_instance_not_found(&source_db_instance_identifier));
1618 }
1619 };
1620
1621 let default_db = default_db_name(&source_instance.engine);
1622 let db_name = source_instance
1623 .db_name
1624 .as_deref()
1625 .unwrap_or(default_db)
1626 .to_string();
1627
1628 (source_instance, db_name)
1629 };
1630
1631 let dump_data = match runtime
1632 .dump_database(
1633 &source_db_instance_identifier,
1634 &source_instance.engine,
1635 &source_instance.master_username,
1636 &source_instance.master_user_password,
1637 &db_name,
1638 )
1639 .await
1640 {
1641 Ok(data) => data,
1642 Err(e) => {
1643 self.state
1644 .write()
1645 .get_or_create(&request.account_id)
1646 .cancel_instance_creation(&db_instance_identifier);
1647 return Err(runtime_error_to_service_error(e));
1648 }
1649 };
1650
1651 let (dbi_resource_id, db_instance_arn) = {
1652 let accounts = self.state.read();
1653 let empty = RdsState::new(&request.account_id, &request.region);
1654 let s = accounts.get(&request.account_id).unwrap_or(&empty);
1655 (
1656 s.next_dbi_resource_id(),
1657 s.db_instance_arn(&db_instance_identifier),
1658 )
1659 };
1660 let created_at = Utc::now();
1661
1662 let running = match runtime
1663 .ensure_postgres(
1664 &db_instance_identifier,
1665 &source_instance.engine,
1666 &source_instance.engine_version,
1667 &source_instance.master_username,
1668 &source_instance.master_user_password,
1669 &db_name,
1670 &request.account_id,
1671 &request.region,
1672 )
1673 .await
1674 {
1675 Ok(running) => running,
1676 Err(e) => {
1677 self.state
1678 .write()
1679 .get_or_create(&request.account_id)
1680 .cancel_instance_creation(&db_instance_identifier);
1681 return Err(runtime_error_to_service_error(e));
1682 }
1683 };
1684
1685 if let Err(e) = runtime
1686 .restore_database(
1687 &db_instance_identifier,
1688 &source_instance.engine,
1689 &source_instance.master_username,
1690 &source_instance.master_user_password,
1691 &db_name,
1692 &dump_data,
1693 )
1694 .await
1695 {
1696 self.state
1697 .write()
1698 .get_or_create(&request.account_id)
1699 .cancel_instance_creation(&db_instance_identifier);
1700 runtime.stop_container(&db_instance_identifier).await;
1701 return Err(runtime_error_to_service_error(e));
1702 }
1703
1704 let replica = build_read_replica_instance(
1705 &db_instance_identifier,
1706 db_instance_arn,
1707 dbi_resource_id,
1708 created_at,
1709 &source_db_instance_identifier,
1710 &source_instance,
1711 &running,
1712 );
1713
1714 let source_missing = {
1715 let mut accounts = self.state.write();
1716 let state = accounts.get_or_create(&request.account_id);
1717 match state.instances.get_mut(&source_db_instance_identifier) {
1718 Some(source) => {
1719 source
1720 .read_replica_db_instance_identifiers
1721 .push(db_instance_identifier.clone());
1722 state.finish_instance_creation(replica.clone());
1723 false
1724 }
1725 None => {
1726 state.cancel_instance_creation(&db_instance_identifier);
1727 true
1728 }
1729 }
1730 };
1731
1732 if source_missing {
1733 runtime.stop_container(&db_instance_identifier).await;
1734 return Err(db_instance_not_found(&source_db_instance_identifier));
1735 }
1736
1737 self.emit_event(
1738 RdsSourceType::DbInstance,
1739 &db_instance_identifier,
1740 &replica.db_instance_arn,
1741 "RDS-EVENT-0005",
1742 &["creation", "read replica"],
1743 "Read replica DB instance created",
1744 );
1745
1746 Ok(AwsResponse::xml(
1747 StatusCode::OK,
1748 xml_wrap(
1749 "CreateDBInstanceReadReplica",
1750 &format!(
1751 "<DBInstance>{}</DBInstance>",
1752 db_instance_xml(&replica, None)
1753 ),
1754 &request.request_id,
1755 ),
1756 ))
1757 }
1758
1759 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1760 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1761 let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1762 let subnet_ids = parse_subnet_ids(request)?;
1763
1764 if subnet_ids.is_empty() {
1765 return Err(AwsServiceError::aws_error(
1766 StatusCode::BAD_REQUEST,
1767 "InvalidParameterValue",
1768 "At least one subnet must be specified.",
1769 ));
1770 }
1771
1772 if subnet_ids.len() < 2 {
1773 return Err(AwsServiceError::aws_error(
1774 StatusCode::BAD_REQUEST,
1775 "DBSubnetGroupDoesNotCoverEnoughAZs",
1776 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1777 ));
1778 }
1779
1780 let mut accounts = self.state.write();
1781 let state = accounts.get_or_create(&request.account_id);
1782
1783 if state.subnet_groups.contains_key(&db_subnet_group_name) {
1784 return Err(AwsServiceError::aws_error(
1785 StatusCode::CONFLICT,
1786 "DBSubnetGroupAlreadyExists",
1787 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1788 ));
1789 }
1790
1791 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1792 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1793 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1794 .collect();
1795
1796 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1798 if unique_azs.len() < 2 {
1799 return Err(AwsServiceError::aws_error(
1800 StatusCode::BAD_REQUEST,
1801 "DBSubnetGroupDoesNotCoverEnoughAZs",
1802 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1803 ));
1804 }
1805
1806 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1807 let tags = parse_tags(request)?;
1808
1809 let subnet_group = DbSubnetGroup {
1810 db_subnet_group_name: db_subnet_group_name.clone(),
1811 db_subnet_group_arn,
1812 db_subnet_group_description,
1813 vpc_id,
1814 subnet_ids,
1815 subnet_availability_zones,
1816 tags,
1817 };
1818
1819 state
1820 .subnet_groups
1821 .insert(db_subnet_group_name, subnet_group.clone());
1822
1823 Ok(AwsResponse::xml(
1824 StatusCode::OK,
1825 xml_wrap(
1826 "CreateDBSubnetGroup",
1827 &format!(
1828 "<DBSubnetGroup>{}</DBSubnetGroup>",
1829 db_subnet_group_xml(&subnet_group)
1830 ),
1831 &request.request_id,
1832 ),
1833 ))
1834 }
1835
1836 fn describe_db_subnet_groups(
1837 &self,
1838 request: &AwsRequest,
1839 ) -> Result<AwsResponse, AwsServiceError> {
1840 let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1841 let marker = optional_param(request, "Marker");
1842 let max_records = optional_param(request, "MaxRecords");
1843
1844 let accounts = self.state.read();
1845 let empty = RdsState::new(&request.account_id, &request.region);
1846 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1847
1848 if let Some(name) = db_subnet_group_name {
1850 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1851 AwsServiceError::aws_error(
1852 StatusCode::NOT_FOUND,
1853 "DBSubnetGroupNotFoundFault",
1854 format!("DBSubnetGroup {} not found.", name),
1855 )
1856 })?;
1857
1858 return Ok(AwsResponse::xml(
1859 StatusCode::OK,
1860 xml_wrap(
1861 "DescribeDBSubnetGroups",
1862 &format!(
1863 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1864 db_subnet_group_xml(sg)
1865 ),
1866 &request.request_id,
1867 ),
1868 ));
1869 }
1870
1871 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1873 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1874
1875 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1877 &sg.db_subnet_group_name
1878 })?;
1879
1880 let marker_xml = paginated
1881 .next_marker
1882 .as_ref()
1883 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1884 .unwrap_or_default();
1885
1886 let body = paginated
1887 .items
1888 .iter()
1889 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1890 .collect::<Vec<_>>()
1891 .join("");
1892
1893 Ok(AwsResponse::xml(
1894 StatusCode::OK,
1895 xml_wrap(
1896 "DescribeDBSubnetGroups",
1897 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1898 &request.request_id,
1899 ),
1900 ))
1901 }
1902
1903 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1904 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1905
1906 let mut accounts = self.state.write();
1907 let state = accounts.get_or_create(&request.account_id);
1908
1909 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1910 return Err(AwsServiceError::aws_error(
1911 StatusCode::NOT_FOUND,
1912 "DBSubnetGroupNotFoundFault",
1913 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1914 ));
1915 }
1916
1917 Ok(AwsResponse::xml(
1918 StatusCode::OK,
1919 xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1920 ))
1921 }
1922
1923 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1924 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1925 let subnet_ids = parse_subnet_ids(request)?;
1926
1927 if subnet_ids.is_empty() {
1928 return Err(AwsServiceError::aws_error(
1929 StatusCode::BAD_REQUEST,
1930 "InvalidParameterValue",
1931 "At least one subnet must be specified.",
1932 ));
1933 }
1934
1935 if subnet_ids.len() < 2 {
1936 return Err(AwsServiceError::aws_error(
1937 StatusCode::BAD_REQUEST,
1938 "DBSubnetGroupDoesNotCoverEnoughAZs",
1939 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1940 ));
1941 }
1942
1943 let mut accounts = self.state.write();
1944 let state = accounts.get_or_create(&request.account_id);
1945
1946 let region = state.region.clone();
1947
1948 let subnet_group = state
1949 .subnet_groups
1950 .get_mut(&db_subnet_group_name)
1951 .ok_or_else(|| {
1952 AwsServiceError::aws_error(
1953 StatusCode::NOT_FOUND,
1954 "DBSubnetGroupNotFoundFault",
1955 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1956 )
1957 })?;
1958
1959 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1960 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
1961 .collect();
1962
1963 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1965 if unique_azs.len() < 2 {
1966 return Err(AwsServiceError::aws_error(
1967 StatusCode::BAD_REQUEST,
1968 "DBSubnetGroupDoesNotCoverEnoughAZs",
1969 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1970 ));
1971 }
1972
1973 subnet_group.subnet_ids = subnet_ids;
1974 subnet_group.subnet_availability_zones = subnet_availability_zones;
1975
1976 let subnet_group_clone = subnet_group.clone();
1977
1978 Ok(AwsResponse::xml(
1979 StatusCode::OK,
1980 xml_wrap(
1981 "ModifyDBSubnetGroup",
1982 &format!(
1983 "<DBSubnetGroup>{}</DBSubnetGroup>",
1984 db_subnet_group_xml(&subnet_group_clone)
1985 ),
1986 &request.request_id,
1987 ),
1988 ))
1989 }
1990
1991 fn create_db_parameter_group(
1992 &self,
1993 request: &AwsRequest,
1994 ) -> Result<AwsResponse, AwsServiceError> {
1995 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1996 let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1997 let description = required_param(request, "Description")?;
1998
1999 let valid_families = [
2001 "postgres16",
2002 "postgres15",
2003 "postgres14",
2004 "postgres13",
2005 "mysql8.0",
2006 "mysql5.7",
2007 "mariadb10.11",
2008 "mariadb10.6",
2009 ];
2010
2011 if !valid_families.contains(&db_parameter_group_family.as_str()) {
2012 return Err(AwsServiceError::aws_error(
2013 StatusCode::BAD_REQUEST,
2014 "InvalidParameterValue",
2015 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
2016 ));
2017 }
2018
2019 let mut accounts = self.state.write();
2020 let state = accounts.get_or_create(&request.account_id);
2021
2022 if state
2023 .parameter_groups
2024 .contains_key(&db_parameter_group_name)
2025 {
2026 return Err(AwsServiceError::aws_error(
2027 StatusCode::CONFLICT,
2028 "DBParameterGroupAlreadyExists",
2029 format!("DBParameterGroup {db_parameter_group_name} already exists."),
2030 ));
2031 }
2032
2033 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
2034 let tags = parse_tags(request)?;
2035
2036 let parameter_group = DbParameterGroup {
2037 db_parameter_group_name: db_parameter_group_name.clone(),
2038 db_parameter_group_arn,
2039 db_parameter_group_family,
2040 description,
2041 parameters: std::collections::HashMap::new(),
2042 tags,
2043 };
2044
2045 state
2046 .parameter_groups
2047 .insert(db_parameter_group_name, parameter_group.clone());
2048
2049 Ok(AwsResponse::xml(
2050 StatusCode::OK,
2051 xml_wrap(
2052 "CreateDBParameterGroup",
2053 &format!(
2054 "<DBParameterGroup>{}</DBParameterGroup>",
2055 db_parameter_group_xml(¶meter_group)
2056 ),
2057 &request.request_id,
2058 ),
2059 ))
2060 }
2061
2062 fn describe_db_parameter_groups(
2063 &self,
2064 request: &AwsRequest,
2065 ) -> Result<AwsResponse, AwsServiceError> {
2066 let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2067 let marker = optional_param(request, "Marker");
2068 let max_records = optional_param(request, "MaxRecords");
2069
2070 let accounts = self.state.read();
2071 let empty = RdsState::new(&request.account_id, &request.region);
2072 let state = accounts.get(&request.account_id).unwrap_or(&empty);
2073
2074 if let Some(name) = db_parameter_group_name {
2076 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2077 AwsServiceError::aws_error(
2078 StatusCode::NOT_FOUND,
2079 "DBParameterGroupNotFound",
2080 format!("DBParameterGroup {} not found.", name),
2081 )
2082 })?;
2083
2084 return Ok(AwsResponse::xml(
2085 StatusCode::OK,
2086 xml_wrap(
2087 "DescribeDBParameterGroups",
2088 &format!(
2089 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2090 db_parameter_group_xml(pg)
2091 ),
2092 &request.request_id,
2093 ),
2094 ));
2095 }
2096
2097 let mut parameter_groups: Vec<DbParameterGroup> =
2099 state.parameter_groups.values().cloned().collect();
2100 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2101
2102 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2104 &pg.db_parameter_group_name
2105 })?;
2106
2107 let marker_xml = paginated
2108 .next_marker
2109 .as_ref()
2110 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2111 .unwrap_or_default();
2112
2113 let body = paginated
2114 .items
2115 .iter()
2116 .map(|pg| {
2117 format!(
2118 "<DBParameterGroup>{}</DBParameterGroup>",
2119 db_parameter_group_xml(pg)
2120 )
2121 })
2122 .collect::<Vec<_>>()
2123 .join("");
2124
2125 Ok(AwsResponse::xml(
2126 StatusCode::OK,
2127 xml_wrap(
2128 "DescribeDBParameterGroups",
2129 &format!(
2130 "<DBParameterGroups>{}</DBParameterGroups>{}",
2131 body, marker_xml
2132 ),
2133 &request.request_id,
2134 ),
2135 ))
2136 }
2137
2138 fn delete_db_parameter_group(
2139 &self,
2140 request: &AwsRequest,
2141 ) -> Result<AwsResponse, AwsServiceError> {
2142 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2143
2144 let mut accounts = self.state.write();
2145 let state = accounts.get_or_create(&request.account_id);
2146
2147 if db_parameter_group_name.starts_with("default.") {
2148 return Err(AwsServiceError::aws_error(
2149 StatusCode::BAD_REQUEST,
2150 "InvalidParameterValue",
2151 "Cannot delete default parameter groups.",
2152 ));
2153 }
2154
2155 if state
2156 .parameter_groups
2157 .remove(&db_parameter_group_name)
2158 .is_none()
2159 {
2160 return Err(AwsServiceError::aws_error(
2161 StatusCode::NOT_FOUND,
2162 "DBParameterGroupNotFound",
2163 format!("DBParameterGroup {db_parameter_group_name} not found."),
2164 ));
2165 }
2166
2167 Ok(AwsResponse::xml(
2168 StatusCode::OK,
2169 xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
2170 ))
2171 }
2172
2173 fn modify_db_parameter_group(
2174 &self,
2175 request: &AwsRequest,
2176 ) -> Result<AwsResponse, AwsServiceError> {
2177 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2178
2179 let mut accounts = self.state.write();
2180 let state = accounts.get_or_create(&request.account_id);
2181
2182 let parameter_group = state
2183 .parameter_groups
2184 .get_mut(&db_parameter_group_name)
2185 .ok_or_else(|| {
2186 AwsServiceError::aws_error(
2187 StatusCode::NOT_FOUND,
2188 "DBParameterGroupNotFound",
2189 format!("DBParameterGroup {db_parameter_group_name} not found."),
2190 )
2191 })?;
2192
2193 if let Some(new_description) = optional_param(request, "Description") {
2194 parameter_group.description = new_description;
2195 }
2196
2197 let parameter_group_clone = parameter_group.clone();
2198
2199 Ok(AwsResponse::xml(
2200 StatusCode::OK,
2201 xml_wrap(
2202 "ModifyDBParameterGroup",
2203 &format!(
2204 "<DBParameterGroupName>{}</DBParameterGroupName>",
2205 xml_escape(¶meter_group_clone.db_parameter_group_name)
2206 ),
2207 &request.request_id,
2208 ),
2209 ))
2210 }
2211}
2212
2213fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2214 fakecloud_core::query::optional_query_param(req, name)
2215}
2216
2217fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2218 fakecloud_core::query::required_query_param(req, name)
2219}
2220
2221fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2222 let value = required_param(req, name)?;
2223 value.parse::<i32>().map_err(|_| {
2224 AwsServiceError::aws_error(
2225 StatusCode::BAD_REQUEST,
2226 "InvalidParameterValue",
2227 format!("Parameter {name} must be a valid integer."),
2228 )
2229 })
2230}
2231
2232fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2233 optional_param(req, name)
2234 .map(|value| {
2235 value.parse::<i32>().map_err(|_| {
2236 AwsServiceError::aws_error(
2237 StatusCode::BAD_REQUEST,
2238 "InvalidParameterValue",
2239 format!("Parameter {name} must be a valid integer."),
2240 )
2241 })
2242 })
2243 .transpose()
2244}
2245
2246fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2247 let mut tags = Vec::new();
2248 for index in 1.. {
2249 let key_name = format!("Tags.Tag.{index}.Key");
2250 let value_name = format!("Tags.Tag.{index}.Value");
2251 let key = optional_param(req, &key_name);
2252 let value = optional_param(req, &value_name);
2253
2254 match (key, value) {
2255 (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2256 (None, None) => break,
2257 _ => {
2258 return Err(AwsServiceError::aws_error(
2259 StatusCode::BAD_REQUEST,
2260 "InvalidParameterValue",
2261 "Each tag must include both Key and Value.",
2262 ));
2263 }
2264 }
2265 }
2266
2267 Ok(tags)
2268}
2269
2270fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2271 let mut keys = Vec::new();
2272 for index in 1.. {
2273 let key_name = format!("TagKeys.member.{index}");
2274 match optional_param(req, &key_name) {
2275 Some(key) => keys.push(key),
2276 None => break,
2277 }
2278 }
2279
2280 Ok(keys)
2281}
2282
2283fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2284 let mut subnet_ids = Vec::new();
2285 for index in 1.. {
2286 let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2287 match optional_param(req, &subnet_id_name) {
2288 Some(subnet_id) => subnet_ids.push(subnet_id),
2289 None => break,
2290 }
2291 }
2292
2293 Ok(subnet_ids)
2294}
2295
2296fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2297 let mut security_group_ids = Vec::new();
2298 for index in 1.. {
2299 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2300 match optional_param(req, &sg_id_name) {
2301 Some(sg_id) => security_group_ids.push(sg_id),
2302 None => break,
2303 }
2304 }
2305
2306 if security_group_ids.is_empty() {
2308 security_group_ids.push("sg-default".to_string());
2309 }
2310
2311 security_group_ids
2312}
2313
2314fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2315 req.query_params.keys().any(|key| key.starts_with(prefix))
2316}
2317
2318fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2319 value
2320 .map(|raw| match raw {
2321 "true" | "True" | "TRUE" => Ok(true),
2322 "false" | "False" | "FALSE" => Ok(false),
2323 _ => Err(AwsServiceError::aws_error(
2324 StatusCode::BAD_REQUEST,
2325 "InvalidParameterValue",
2326 format!("Boolean parameter value '{raw}' is invalid."),
2327 )),
2328 })
2329 .transpose()
2330}
2331
2332struct PaginationResult<T> {
2333 items: Vec<T>,
2334 next_marker: Option<String>,
2335}
2336
2337fn paginate<T, F>(
2338 mut items: Vec<T>,
2339 marker: Option<String>,
2340 max_records: Option<String>,
2341 get_id: F,
2342) -> Result<PaginationResult<T>, AwsServiceError>
2343where
2344 F: Fn(&T) -> &str,
2345{
2346 let max = if let Some(max_str) = max_records {
2348 let parsed = max_str.parse::<i32>().map_err(|_| {
2349 AwsServiceError::aws_error(
2350 StatusCode::BAD_REQUEST,
2351 "InvalidParameterValue",
2352 "MaxRecords must be a valid integer.",
2353 )
2354 })?;
2355 if !(1..=100).contains(&parsed) {
2356 return Err(AwsServiceError::aws_error(
2357 StatusCode::BAD_REQUEST,
2358 "InvalidParameterValue",
2359 "MaxRecords must be between 1 and 100.",
2360 ));
2361 }
2362 parsed as usize
2363 } else {
2364 100
2365 };
2366
2367 let start_id = if let Some(encoded_marker) = marker {
2369 let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2370 AwsServiceError::aws_error(
2371 StatusCode::BAD_REQUEST,
2372 "InvalidParameterValue",
2373 "Marker is invalid.",
2374 )
2375 })?;
2376 let id = String::from_utf8(decoded).map_err(|_| {
2377 AwsServiceError::aws_error(
2378 StatusCode::BAD_REQUEST,
2379 "InvalidParameterValue",
2380 "Marker is invalid.",
2381 )
2382 })?;
2383 Some(id)
2384 } else {
2385 None
2386 };
2387
2388 let start_index = if let Some(ref start_id) = start_id {
2390 items
2391 .iter()
2392 .position(|item| get_id(item) == start_id)
2393 .map(|pos| pos + 1) .unwrap_or(items.len()) } else {
2396 0
2397 };
2398
2399 let total_items = items.len();
2401 let end_index = std::cmp::min(start_index + max, total_items);
2402 let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2403
2404 let next_marker = if end_index < total_items {
2406 paginated_items
2407 .last()
2408 .map(|item| BASE64.encode(get_id(item).as_bytes()))
2409 } else {
2410 None
2411 };
2412
2413 Ok(PaginationResult {
2414 items: paginated_items,
2415 next_marker,
2416 })
2417}
2418
2419fn validate_create_request(
2420 db_instance_identifier: &str,
2421 allocated_storage: i32,
2422 db_instance_class: &str,
2423 engine: &str,
2424 engine_version: &str,
2425 port: i32,
2426) -> Result<(), AwsServiceError> {
2427 if allocated_storage <= 0 {
2428 return Err(AwsServiceError::aws_error(
2429 StatusCode::BAD_REQUEST,
2430 "InvalidParameterValue",
2431 "AllocatedStorage must be greater than zero.",
2432 ));
2433 }
2434 if port <= 0 {
2435 return Err(AwsServiceError::aws_error(
2436 StatusCode::BAD_REQUEST,
2437 "InvalidParameterValue",
2438 "Port must be greater than zero.",
2439 ));
2440 }
2441 if !db_instance_identifier
2442 .chars()
2443 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2444 {
2445 return Err(AwsServiceError::aws_error(
2446 StatusCode::BAD_REQUEST,
2447 "InvalidParameterValue",
2448 "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2449 ));
2450 }
2451 let supported_engines = [
2453 "postgres",
2454 "mysql",
2455 "mariadb",
2456 "oracle-ee",
2457 "oracle-se2",
2458 "oracle-ee-cdb",
2459 "oracle-se2-cdb",
2460 "sqlserver-ee",
2461 "sqlserver-se",
2462 "sqlserver-ex",
2463 "sqlserver-web",
2464 "db2-se",
2465 "db2-ae",
2466 ];
2467 if !supported_engines.contains(&engine) {
2468 return Err(AwsServiceError::aws_error(
2469 StatusCode::BAD_REQUEST,
2470 "InvalidParameterValue",
2471 format!("Engine '{}' is not supported.", engine),
2472 ));
2473 }
2474
2475 let supported_versions = match engine {
2484 "postgres" => vec!["16", "15", "14", "13", "16.3", "15.5", "14.10", "13.13"],
2485 "mysql" => vec!["8.0", "8.0.35", "8.0.28", "5.7.44"],
2486 "mariadb" => vec!["10.6", "10.11", "11.4", "11.4.5", "10.11.6", "10.6.16"],
2487 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2488 vec!["23.0.0", "21.0.0", "19.0.0"]
2489 }
2490 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2491 vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2492 }
2493 "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2494 _ => vec![],
2495 };
2496
2497 if !supported_versions.contains(&engine_version) {
2498 return Err(AwsServiceError::aws_error(
2499 StatusCode::BAD_REQUEST,
2500 "InvalidParameterValue",
2501 format!("EngineVersion '{engine_version}' is not supported yet."),
2502 ));
2503 }
2504 validate_db_instance_class(db_instance_class)?;
2505 Ok(())
2506}
2507
2508fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2509 if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2510 return Err(AwsServiceError::aws_error(
2511 StatusCode::BAD_REQUEST,
2512 "InvalidParameterValue",
2513 format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2514 ));
2515 }
2516 Ok(())
2517}
2518
2519fn filter_engine_versions(
2520 versions: &[EngineVersionInfo],
2521 engine: &Option<String>,
2522 engine_version: &Option<String>,
2523 family: &Option<String>,
2524) -> Vec<EngineVersionInfo> {
2525 versions
2526 .iter()
2527 .filter(|candidate| {
2528 engine
2529 .as_ref()
2530 .is_none_or(|expected| candidate.engine == *expected)
2531 })
2532 .filter(|candidate| {
2533 engine_version
2534 .as_ref()
2535 .is_none_or(|expected| candidate.engine_version == *expected)
2536 })
2537 .filter(|candidate| {
2538 family
2539 .as_ref()
2540 .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2541 })
2542 .cloned()
2543 .collect()
2544}
2545
2546fn filter_orderable_options(
2547 options: &[OrderableDbInstanceOption],
2548 engine: &Option<String>,
2549 engine_version: &Option<String>,
2550 db_instance_class: &Option<String>,
2551 license_model: &Option<String>,
2552 vpc: Option<bool>,
2553) -> Vec<OrderableDbInstanceOption> {
2554 options
2555 .iter()
2556 .filter(|candidate| {
2557 engine
2558 .as_ref()
2559 .is_none_or(|expected| candidate.engine == *expected)
2560 })
2561 .filter(|candidate| {
2562 engine_version
2563 .as_ref()
2564 .is_none_or(|expected| candidate.engine_version == *expected)
2565 })
2566 .filter(|candidate| {
2567 db_instance_class
2568 .as_ref()
2569 .is_none_or(|expected| candidate.db_instance_class == *expected)
2570 })
2571 .filter(|candidate| {
2572 license_model
2573 .as_ref()
2574 .is_none_or(|expected| candidate.license_model == *expected)
2575 })
2576 .filter(|_| vpc.unwrap_or(true))
2577 .cloned()
2578 .collect()
2579}
2580
2581#[allow(clippy::too_many_arguments)]
2585fn build_restored_instance(
2589 db_instance_identifier: &str,
2590 db_instance_arn: String,
2591 dbi_resource_id: String,
2592 created_at: chrono::DateTime<Utc>,
2593 vpc_security_group_ids: Vec<String>,
2594 snapshot: &DbSnapshot,
2595 running: &crate::runtime::RunningDbContainer,
2596) -> DbInstance {
2597 DbInstance {
2598 db_instance_identifier: db_instance_identifier.to_string(),
2599 db_instance_arn,
2600 db_instance_class: "db.t3.micro".to_string(),
2601 engine: snapshot.engine.clone(),
2602 engine_version: snapshot.engine_version.clone(),
2603 db_instance_status: "available".to_string(),
2604 master_username: snapshot.master_username.clone(),
2605 db_name: snapshot.db_name.clone(),
2606 endpoint_address: "127.0.0.1".to_string(),
2607 port: i32::from(running.host_port),
2608 allocated_storage: snapshot.allocated_storage,
2609 publicly_accessible: true,
2610 deletion_protection: false,
2611 created_at,
2612 dbi_resource_id,
2613 master_user_password: snapshot.master_user_password.clone(),
2614 container_id: running.container_id.clone(),
2615 host_port: running.host_port,
2616 tags: Vec::new(),
2617 read_replica_source_db_instance_identifier: None,
2618 read_replica_db_instance_identifiers: Vec::new(),
2619 vpc_security_group_ids,
2620 db_parameter_group_name: None,
2621 backup_retention_period: 1,
2622 preferred_backup_window: "03:00-04:00".to_string(),
2623 latest_restorable_time: Some(created_at),
2624 option_group_name: None,
2625 multi_az: false,
2626 pending_modified_values: None,
2627 }
2628}
2629
2630fn build_read_replica_instance(
2631 db_instance_identifier: &str,
2632 db_instance_arn: String,
2633 dbi_resource_id: String,
2634 created_at: chrono::DateTime<Utc>,
2635 source_db_instance_identifier: &str,
2636 source: &DbInstance,
2637 running: &crate::runtime::RunningDbContainer,
2638) -> DbInstance {
2639 DbInstance {
2640 db_instance_identifier: db_instance_identifier.to_string(),
2641 db_instance_arn,
2642 db_instance_class: source.db_instance_class.clone(),
2643 engine: source.engine.clone(),
2644 engine_version: source.engine_version.clone(),
2645 db_instance_status: "available".to_string(),
2646 master_username: source.master_username.clone(),
2647 db_name: source.db_name.clone(),
2648 endpoint_address: "127.0.0.1".to_string(),
2649 port: i32::from(running.host_port),
2650 allocated_storage: source.allocated_storage,
2651 publicly_accessible: source.publicly_accessible,
2652 deletion_protection: false,
2653 created_at,
2654 dbi_resource_id,
2655 master_user_password: source.master_user_password.clone(),
2656 container_id: running.container_id.clone(),
2657 host_port: running.host_port,
2658 tags: Vec::new(),
2659 read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2660 read_replica_db_instance_identifiers: Vec::new(),
2661 vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2662 db_parameter_group_name: source.db_parameter_group_name.clone(),
2663 backup_retention_period: source.backup_retention_period,
2664 preferred_backup_window: source.preferred_backup_window.clone(),
2665 latest_restorable_time: if source.backup_retention_period > 0 {
2666 Some(created_at)
2667 } else {
2668 None
2669 },
2670 option_group_name: source.option_group_name.clone(),
2671 multi_az: source.multi_az,
2672 pending_modified_values: None,
2673 }
2674}
2675
2676fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2677 fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2678}
2679
2680fn engine_version_xml(version: &EngineVersionInfo) -> String {
2681 format!(
2682 "<DBEngineVersion>\
2683 <Engine>{}</Engine>\
2684 <EngineVersion>{}</EngineVersion>\
2685 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2686 <DBEngineDescription>{}</DBEngineDescription>\
2687 <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2688 <Status>{}</Status>\
2689 </DBEngineVersion>",
2690 xml_escape(&version.engine),
2691 xml_escape(&version.engine_version),
2692 xml_escape(&version.db_parameter_group_family),
2693 xml_escape(&version.db_engine_description),
2694 xml_escape(&version.db_engine_version_description),
2695 xml_escape(&version.status),
2696 )
2697}
2698
2699fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2700 format!(
2701 "<OrderableDBInstanceOption>\
2702 <Engine>{}</Engine>\
2703 <EngineVersion>{}</EngineVersion>\
2704 <DBInstanceClass>{}</DBInstanceClass>\
2705 <LicenseModel>{}</LicenseModel>\
2706 <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2707 <MultiAZCapable>true</MultiAZCapable>\
2708 <ReadReplicaCapable>true</ReadReplicaCapable>\
2709 <Vpc>true</Vpc>\
2710 <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2711 <StorageType>{}</StorageType>\
2712 <SupportsIops>false</SupportsIops>\
2713 <MinStorageSize>{}</MinStorageSize>\
2714 <MaxStorageSize>{}</MaxStorageSize>\
2715 <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2716 </OrderableDBInstanceOption>",
2717 xml_escape(&option.engine),
2718 xml_escape(&option.engine_version),
2719 xml_escape(&option.db_instance_class),
2720 xml_escape(&option.license_model),
2721 xml_escape(&option.storage_type),
2722 option.min_storage_size,
2723 option.max_storage_size,
2724 )
2725}
2726
2727fn tag_xml(tag: &RdsTag) -> String {
2728 format!(
2729 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2730 xml_escape(&tag.key),
2731 xml_escape(&tag.value),
2732 )
2733}
2734
2735pub(crate) fn emit_event_static(
2738 delivery_bus: Option<&Arc<DeliveryBus>>,
2739 source_type: RdsSourceType,
2740 source_identifier: &str,
2741 source_arn: &str,
2742 event_id: &str,
2743 event_categories: &[&str],
2744 message: &str,
2745) {
2746 let Some(bus) = delivery_bus else {
2747 return;
2748 };
2749 let detail = serde_json::json!({
2750 "EventCategories": event_categories,
2751 "SourceType": source_type.as_str(),
2752 "SourceArn": source_arn,
2753 "Date": Utc::now().to_rfc3339(),
2754 "Message": message,
2755 "SourceIdentifier": source_identifier,
2756 "EventID": event_id,
2757 });
2758 bus.put_event_to_eventbridge(
2759 "aws.rds",
2760 source_type.detail_type(),
2761 &detail.to_string(),
2762 "default",
2763 );
2764}
2765
2766fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2767 let status = status_override.unwrap_or(&instance.db_instance_status);
2768 let db_name_xml = instance
2769 .db_name
2770 .as_ref()
2771 .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2772 .unwrap_or_default();
2773
2774 let read_replica_source_xml = instance
2775 .read_replica_source_db_instance_identifier
2776 .as_ref()
2777 .map(|source| {
2778 format!(
2779 "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2780 xml_escape(source)
2781 )
2782 })
2783 .unwrap_or_default();
2784
2785 let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2786 "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2787 } else {
2788 format!(
2789 "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2790 instance
2791 .read_replica_db_instance_identifiers
2792 .iter()
2793 .map(|id| format!(
2794 "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2795 xml_escape(id)
2796 ))
2797 .collect::<String>()
2798 )
2799 };
2800
2801 let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2802 "<VpcSecurityGroups/>".to_string()
2803 } else {
2804 format!(
2805 "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2806 instance
2807 .vpc_security_group_ids
2808 .iter()
2809 .map(|sg_id| format!(
2810 "<VpcSecurityGroupMembership>\
2811 <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2812 <Status>active</Status>\
2813 </VpcSecurityGroupMembership>",
2814 xml_escape(sg_id)
2815 ))
2816 .collect::<String>()
2817 )
2818 };
2819
2820 let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2821 Some(pg_name) => format!(
2822 "<DBParameterGroups>\
2823 <DBParameterGroup>\
2824 <DBParameterGroupName>{}</DBParameterGroupName>\
2825 <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2826 </DBParameterGroup>\
2827 </DBParameterGroups>",
2828 xml_escape(pg_name)
2829 ),
2830 None => "<DBParameterGroups/>".to_string(),
2831 };
2832
2833 let option_group_memberships_xml = match &instance.option_group_name {
2834 Some(og_name) => format!(
2835 "<OptionGroupMemberships>\
2836 <OptionGroupMembership>\
2837 <OptionGroupName>{}</OptionGroupName>\
2838 <Status>in-sync</Status>\
2839 </OptionGroupMembership>\
2840 </OptionGroupMemberships>",
2841 xml_escape(og_name)
2842 ),
2843 None => "<OptionGroupMemberships/>".to_string(),
2844 };
2845
2846 let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2847 let mut fields = Vec::new();
2848 if let Some(ref class) = pending.db_instance_class {
2849 fields.push(format!(
2850 "<DBInstanceClass>{}</DBInstanceClass>",
2851 xml_escape(class)
2852 ));
2853 }
2854 if let Some(allocated_storage) = pending.allocated_storage {
2855 fields.push(format!(
2856 "<AllocatedStorage>{}</AllocatedStorage>",
2857 allocated_storage
2858 ));
2859 }
2860 if let Some(backup_retention_period) = pending.backup_retention_period {
2861 fields.push(format!(
2862 "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2863 backup_retention_period
2864 ));
2865 }
2866 if let Some(multi_az) = pending.multi_az {
2867 fields.push(format!(
2868 "<MultiAZ>{}</MultiAZ>",
2869 if multi_az { "true" } else { "false" }
2870 ));
2871 }
2872 if let Some(ref engine_version) = pending.engine_version {
2873 fields.push(format!(
2874 "<EngineVersion>{}</EngineVersion>",
2875 xml_escape(engine_version)
2876 ));
2877 }
2878 if pending.master_user_password.is_some() {
2879 fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2880 }
2881 if !fields.is_empty() {
2882 format!(
2883 "<PendingModifiedValues>{}</PendingModifiedValues>",
2884 fields.join("")
2885 )
2886 } else {
2887 String::new()
2888 }
2889 } else {
2890 String::new()
2891 };
2892
2893 let latest_restorable_time_xml = instance
2894 .latest_restorable_time
2895 .map(|t| {
2896 format!(
2897 "<LatestRestorableTime>{}</LatestRestorableTime>",
2898 t.to_rfc3339()
2899 )
2900 })
2901 .unwrap_or_default();
2902
2903 let endpoint_xml = if instance.endpoint_address.is_empty() || instance.port == 0 {
2908 String::new()
2909 } else {
2910 format!(
2911 "<Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>",
2912 xml_escape(&instance.endpoint_address),
2913 instance.port
2914 )
2915 };
2916
2917 format!(
2918 "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2919 <DBInstanceClass>{class}</DBInstanceClass>\
2920 <Engine>{engine}</Engine>\
2921 <DBInstanceStatus>{status}</DBInstanceStatus>\
2922 <MasterUsername>{master_username}</MasterUsername>\
2923 {db_name_xml}\
2924 {endpoint_xml}\
2925 <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2926 <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2927 <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2928 <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2929 <DBSecurityGroups/>\
2930 {vpc_security_groups_xml}\
2931 {db_parameter_groups_xml}\
2932 <AvailabilityZone>us-east-1a</AvailabilityZone>\
2933 {latest_restorable_time_xml}\
2934 <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2935 <MultiAZ>{multi_az}</MultiAZ>\
2936 <EngineVersion>{engine_version}</EngineVersion>\
2937 <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2938 {read_replica_identifiers_xml}\
2939 {read_replica_source_xml}\
2940 <LicenseModel>{license_model}</LicenseModel>\
2941 {option_group_memberships_xml}\
2942 <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2943 <StorageType>gp2</StorageType>\
2944 <DbInstancePort>{port}</DbInstancePort>\
2945 <StorageEncrypted>false</StorageEncrypted>\
2946 <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2947 <DeletionProtection>{deletion_protection}</DeletionProtection>\
2948 {pending_modified_values_xml}\
2949 <DBInstanceArn>{arn}</DBInstanceArn>",
2950 identifier = xml_escape(&instance.db_instance_identifier),
2951 class = xml_escape(&instance.db_instance_class),
2952 engine = xml_escape(&instance.engine),
2953 status = xml_escape(status),
2954 master_username = xml_escape(&instance.master_username),
2955 port = instance.port,
2956 allocated_storage = instance.allocated_storage,
2957 create_time = instance.created_at.to_rfc3339(),
2958 preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2959 backup_retention_period = instance.backup_retention_period,
2960 multi_az = if instance.multi_az { "true" } else { "false" },
2961 engine_version = xml_escape(&instance.engine_version),
2962 license_model = license_model_for_engine(&instance.engine),
2963 publicly_accessible = if instance.publicly_accessible {
2964 "true"
2965 } else {
2966 "false"
2967 },
2968 dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2969 deletion_protection = if instance.deletion_protection {
2970 "true"
2971 } else {
2972 "false"
2973 },
2974 arn = xml_escape(&instance.db_instance_arn),
2975 )
2976}
2977
2978fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2979 format!(
2980 "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2981 <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2982 <SnapshotCreateTime>{}</SnapshotCreateTime>\
2983 <Engine>{}</Engine>\
2984 <EngineVersion>{}</EngineVersion>\
2985 <AllocatedStorage>{}</AllocatedStorage>\
2986 <Status>{}</Status>\
2987 <Port>{}</Port>\
2988 <MasterUsername>{}</MasterUsername>\
2989 {}\
2990 <DbiResourceId>{}</DbiResourceId>\
2991 <SnapshotType>{}</SnapshotType>\
2992 <DBSnapshotArn>{}</DBSnapshotArn>",
2993 xml_escape(&snapshot.db_snapshot_identifier),
2994 xml_escape(&snapshot.db_instance_identifier),
2995 snapshot.snapshot_create_time.to_rfc3339(),
2996 xml_escape(&snapshot.engine),
2997 xml_escape(&snapshot.engine_version),
2998 snapshot.allocated_storage,
2999 xml_escape(&snapshot.status),
3000 snapshot.port,
3001 xml_escape(&snapshot.master_username),
3002 snapshot
3003 .db_name
3004 .as_ref()
3005 .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
3006 .unwrap_or_default(),
3007 xml_escape(&snapshot.dbi_resource_id),
3008 xml_escape(&snapshot.snapshot_type),
3009 xml_escape(&snapshot.db_snapshot_arn),
3010 )
3011}
3012
3013fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
3014 let subnets_xml = subnet_group
3015 .subnet_ids
3016 .iter()
3017 .zip(&subnet_group.subnet_availability_zones)
3018 .map(|(subnet_id, az)| {
3019 format!(
3020 "<Subnet>\
3021 <SubnetIdentifier>{}</SubnetIdentifier>\
3022 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3023 <SubnetStatus>Active</SubnetStatus>\
3024 </Subnet>",
3025 xml_escape(subnet_id),
3026 xml_escape(az)
3027 )
3028 })
3029 .collect::<String>();
3030
3031 format!(
3032 "<DBSubnetGroupName>{}</DBSubnetGroupName>\
3033 <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
3034 <VpcId>{}</VpcId>\
3035 <SubnetGroupStatus>Complete</SubnetGroupStatus>\
3036 <Subnets>{}</Subnets>\
3037 <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
3038 xml_escape(&subnet_group.db_subnet_group_name),
3039 xml_escape(&subnet_group.db_subnet_group_description),
3040 xml_escape(&subnet_group.vpc_id),
3041 subnets_xml,
3042 xml_escape(&subnet_group.db_subnet_group_arn),
3043 )
3044}
3045
3046fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
3047 format!(
3048 "<DBParameterGroupName>{}</DBParameterGroupName>\
3049 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
3050 <Description>{}</Description>\
3051 <DBParameterGroupArn>{}</DBParameterGroupArn>",
3052 xml_escape(¶meter_group.db_parameter_group_name),
3053 xml_escape(¶meter_group.db_parameter_group_family),
3054 xml_escape(¶meter_group.description),
3055 xml_escape(¶meter_group.db_parameter_group_arn),
3056 )
3057}
3058
3059fn db_instance_not_found(identifier: &str) -> AwsServiceError {
3060 AwsServiceError::aws_error(
3061 StatusCode::NOT_FOUND,
3062 "DBInstanceNotFound",
3063 format!("DBInstance {} not found.", identifier),
3064 )
3065}
3066
3067fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
3068 AwsServiceError::aws_error(
3069 StatusCode::NOT_FOUND,
3070 "DBSnapshotNotFound",
3071 format!("DBSnapshot {} not found.", identifier),
3072 )
3073}
3074
3075fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
3076 AwsServiceError::aws_error(
3077 StatusCode::NOT_FOUND,
3078 "DBInstanceNotFound",
3079 format!("DBInstance {resource_name} not found."),
3080 )
3081}
3082
3083fn find_instance_by_arn<'a>(
3084 state: &'a crate::state::RdsState,
3085 resource_name: &str,
3086) -> Result<&'a DbInstance, AwsServiceError> {
3087 state
3088 .instances
3089 .values()
3090 .find(|instance| instance.db_instance_arn == resource_name)
3091 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3092}
3093
3094fn find_instance_by_arn_mut<'a>(
3095 state: &'a mut crate::state::RdsState,
3096 resource_name: &str,
3097) -> Result<&'a mut DbInstance, AwsServiceError> {
3098 state
3099 .instances
3100 .values_mut()
3101 .find(|instance| instance.db_instance_arn == resource_name)
3102 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3103}
3104
3105fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3106 for tag in incoming {
3107 if let Some(existing_tag) = existing
3108 .iter_mut()
3109 .find(|candidate| candidate.key == tag.key)
3110 {
3111 existing_tag.value = tag.value.clone();
3112 } else {
3113 existing.push(tag.clone());
3114 }
3115 }
3116}
3117
3118fn license_model_for_engine(engine: &str) -> &'static str {
3119 match engine {
3125 "mysql" | "mariadb" => "general-public-license",
3126 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3127 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3128 "db2-se" | "db2-ae" => "bring-your-own-license",
3129 _ => "postgresql-license",
3130 }
3131}
3132
3133fn default_db_name(engine: &str) -> &'static str {
3134 match engine {
3135 "mysql" | "mariadb" => "mysql",
3136 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3141 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3145 "db2-se" | "db2-ae" => "BLUDB",
3146 _ => "postgres",
3147 }
3148}
3149
3150fn default_port_for_engine(engine: &str) -> i32 {
3154 match engine {
3155 "postgres" => 5432,
3156 "mysql" | "mariadb" => 3306,
3157 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3158 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3159 "db2-se" | "db2-ae" => 50000,
3160 _ => 5432,
3161 }
3162}
3163
3164fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3170 match engine {
3171 "postgres" => {
3172 let major = engine_version.split('.').next().unwrap_or("16");
3173 format!("default.postgres{}", major)
3174 }
3175 "mysql" => {
3176 let major = if engine_version.starts_with("5.7") {
3177 "5.7"
3178 } else {
3179 "8.0"
3180 };
3181 format!("default.mysql{}", major)
3182 }
3183 "mariadb" => {
3184 let major = if engine_version.starts_with("11.4") {
3185 "11.4"
3186 } else if engine_version.starts_with("10.11") {
3187 "10.11"
3188 } else {
3189 "10.6"
3190 };
3191 format!("default.mariadb{}", major)
3192 }
3193 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3194 let major = engine_version.split('.').next().unwrap_or("23");
3195 format!("default.{engine}-{major}")
3196 }
3197 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3198 let major = engine_version.split('.').next().unwrap_or("16");
3201 format!("default.{engine}-{major}")
3202 }
3203 "db2-se" | "db2-ae" => {
3204 let mut parts = engine_version.split('.');
3207 let major = parts.next().unwrap_or("11");
3208 let minor = parts.next().unwrap_or("5");
3209 format!("default.{engine}-{major}.{minor}")
3210 }
3211 _ => "default.postgres16".to_string(),
3212 }
3213}
3214
3215fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3216 match error {
3217 RuntimeError::Unavailable => AwsServiceError::aws_error(
3218 StatusCode::SERVICE_UNAVAILABLE,
3219 "InvalidParameterValue",
3220 "Docker/Podman is required for RDS DB instances but is not available",
3221 ),
3222 RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3223 StatusCode::INTERNAL_SERVER_ERROR,
3224 "InternalFailure",
3225 message,
3226 ),
3227 }
3228}
3229
3230#[cfg(test)]
3231mod tests {
3232 use std::collections::HashMap;
3233 use std::sync::Arc;
3234
3235 use bytes::Bytes;
3236 use chrono::Utc;
3237 use http::{HeaderMap, Method};
3238 use parking_lot::RwLock;
3239 use uuid::Uuid;
3240
3241 use super::{
3242 db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3243 filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3244 optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3245 RdsSourceType,
3246 };
3247 use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3248 use fakecloud_core::delivery::DeliveryBus;
3249 use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3250
3251 #[test]
3252 fn default_port_matches_aws_for_each_engine() {
3253 assert_eq!(default_port_for_engine("postgres"), 5432);
3254 assert_eq!(default_port_for_engine("mysql"), 3306);
3255 assert_eq!(default_port_for_engine("mariadb"), 3306);
3256 assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3257 assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3258 assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3259 assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3260 assert_eq!(default_port_for_engine("db2-se"), 50000);
3261 assert_eq!(default_port_for_engine("db2-ae"), 50000);
3262 }
3263
3264 #[test]
3265 fn default_parameter_group_uses_engine_major_version() {
3266 assert_eq!(
3267 default_parameter_group("postgres", "16.3"),
3268 "default.postgres16"
3269 );
3270 assert_eq!(
3271 default_parameter_group("mysql", "8.0.35"),
3272 "default.mysql8.0"
3273 );
3274 assert_eq!(
3275 default_parameter_group("oracle-ee", "23.0.0"),
3276 "default.oracle-ee-23"
3277 );
3278 assert_eq!(
3279 default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3280 "default.sqlserver-ex-16"
3281 );
3282 assert_eq!(
3283 default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3284 "default.db2-se-11.5"
3285 );
3286 }
3287
3288 #[test]
3289 fn license_model_reflects_engine_class() {
3290 assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3291 assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3292 assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3293 assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3294 assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3295 }
3296
3297 #[test]
3298 fn default_db_name_picks_per_engine_default() {
3299 assert_eq!(default_db_name("postgres"), "postgres");
3300 assert_eq!(default_db_name("mysql"), "mysql");
3301 assert_eq!(default_db_name("oracle-ee"), "ORCL");
3302 assert_eq!(default_db_name("sqlserver-ex"), "master");
3303 assert_eq!(default_db_name("db2-se"), "BLUDB");
3304 }
3305
3306 #[test]
3307 fn validate_create_request_accepts_new_engines() {
3308 for (engine, version, port) in [
3309 ("oracle-ee", "23.0.0", 1521),
3310 ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3311 ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3312 ] {
3313 validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3314 .expect("engine should be accepted");
3315 }
3316 }
3317
3318 #[test]
3319 fn validate_create_request_rejects_unsupported_engine_version() {
3320 let err =
3321 validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3322 .expect_err("12.x is not in the supported list");
3323 let msg = format!("{err:?}");
3324 assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3325 }
3326
3327 #[test]
3328 fn filter_engine_versions_matches_requested_engine() {
3329 let versions = default_engine_versions();
3330
3331 let filtered =
3332 filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3333
3334 assert_eq!(filtered.len(), 4); assert!(filtered.iter().all(|v| v.engine == "postgres"));
3336 }
3337
3338 #[test]
3339 fn filter_orderable_options_respects_instance_class() {
3340 let options = default_orderable_options();
3341
3342 let filtered = filter_orderable_options(
3343 &options,
3344 &Some("postgres".to_string()),
3345 &Some("16.3".to_string()),
3346 &Some("db.t3.micro".to_string()),
3347 &None,
3348 Some(true),
3349 );
3350
3351 assert_eq!(filtered.len(), 1);
3352 assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3353 }
3354
3355 #[test]
3356 fn validate_create_request_rejects_unsupported_engine() {
3357 let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3358 .expect_err("unsupported engine");
3359
3360 assert_eq!(error.code(), "InvalidParameterValue");
3361 }
3362
3363 #[test]
3364 fn optional_i32_param_rejects_invalid_integer() {
3365 let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3366
3367 let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3368
3369 assert_eq!(error.code(), "InvalidParameterValue");
3370 }
3371
3372 #[test]
3373 fn db_instance_xml_renders_endpoint_and_status() {
3374 let created_at = Utc::now();
3375 let instance = DbInstance {
3376 db_instance_identifier: "test-db".to_string(),
3377 db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3378 db_instance_class: "db.t3.micro".to_string(),
3379 engine: "postgres".to_string(),
3380 engine_version: "16.3".to_string(),
3381 db_instance_status: "available".to_string(),
3382 master_username: "admin".to_string(),
3383 db_name: Some("appdb".to_string()),
3384 endpoint_address: "127.0.0.1".to_string(),
3385 port: 15432,
3386 allocated_storage: 20,
3387 publicly_accessible: true,
3388 deletion_protection: false,
3389 created_at,
3390 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3391 master_user_password: "secret123".to_string(),
3392 container_id: "container".to_string(),
3393 host_port: 15432,
3394 tags: Vec::new(),
3395 read_replica_source_db_instance_identifier: None,
3396 read_replica_db_instance_identifiers: Vec::new(),
3397 vpc_security_group_ids: vec!["sg-12345678".to_string()],
3398 db_parameter_group_name: Some("default.postgres16".to_string()),
3399 backup_retention_period: 1,
3400 preferred_backup_window: "03:00-04:00".to_string(),
3401 latest_restorable_time: Some(created_at),
3402 option_group_name: None,
3403 multi_az: false,
3404 pending_modified_values: None,
3405 };
3406
3407 let xml = db_instance_xml(&instance, Some("creating"));
3408
3409 assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3410 assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3411 assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3412 }
3413
3414 #[test]
3415 fn parse_tags_reads_rds_query_shape() {
3416 let request = request(
3417 "AddTagsToResource",
3418 &[
3419 ("Tags.Tag.1.Key", "env"),
3420 ("Tags.Tag.1.Value", "dev"),
3421 ("Tags.Tag.2.Key", "team"),
3422 ("Tags.Tag.2.Value", "core"),
3423 ],
3424 );
3425
3426 let tags = parse_tags(&request).expect("tags");
3427
3428 assert_eq!(
3429 tags,
3430 vec![
3431 RdsTag {
3432 key: "env".to_string(),
3433 value: "dev".to_string(),
3434 },
3435 RdsTag {
3436 key: "team".to_string(),
3437 value: "core".to_string(),
3438 }
3439 ]
3440 );
3441 }
3442
3443 #[test]
3444 fn parse_tag_keys_reads_member_shape() {
3445 let request = request(
3446 "RemoveTagsFromResource",
3447 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3448 );
3449
3450 let tag_keys = parse_tag_keys(&request).expect("tag keys");
3451
3452 assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3453 }
3454
3455 #[test]
3456 fn merge_tags_updates_existing_values() {
3457 let mut tags = vec![RdsTag {
3458 key: "env".to_string(),
3459 value: "dev".to_string(),
3460 }];
3461
3462 merge_tags(
3463 &mut tags,
3464 &[
3465 RdsTag {
3466 key: "env".to_string(),
3467 value: "prod".to_string(),
3468 },
3469 RdsTag {
3470 key: "team".to_string(),
3471 value: "core".to_string(),
3472 },
3473 ],
3474 );
3475
3476 assert_eq!(tags.len(), 2);
3477 assert_eq!(tags[0].value, "prod");
3478 assert_eq!(tags[1].key, "team");
3479 }
3480
3481 #[tokio::test]
3482 async fn describe_engine_versions_returns_xml_body() {
3483 let service = RdsService::new(Arc::new(RwLock::new(
3484 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3485 )));
3486 let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3487
3488 let response = service.handle(request).await.expect("response");
3489 let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3490
3491 assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3492 assert!(body.contains("<Engine>postgres</Engine>"));
3493 assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3494 }
3495
3496 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3497 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3498 for (key, value) in params {
3499 query_params.insert((*key).to_string(), (*value).to_string());
3500 }
3501
3502 AwsRequest {
3503 service: "rds".to_string(),
3504 action: action.to_string(),
3505 region: "us-east-1".to_string(),
3506 account_id: "123456789012".to_string(),
3507 request_id: "test-request-id".to_string(),
3508 headers: HeaderMap::new(),
3509 query_params,
3510 body: Bytes::new(),
3511 body_stream: parking_lot::Mutex::new(None),
3512 path_segments: vec![],
3513 raw_path: "/".to_string(),
3514 raw_query: String::new(),
3515 method: Method::POST,
3516 is_query_protocol: true,
3517 access_key_id: None,
3518 principal: None,
3519 }
3520 }
3521
3522 fn make_service() -> RdsService {
3525 RdsService::new(Arc::new(RwLock::new(
3526 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3527 )))
3528 }
3529
3530 #[derive(Default)]
3531 struct CapturedEvent {
3532 source: String,
3533 detail_type: String,
3534 detail: String,
3535 }
3536
3537 #[derive(Default)]
3538 struct RecordingEb {
3539 events: std::sync::Mutex<Vec<CapturedEvent>>,
3540 }
3541
3542 impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3543 fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3544 self.events.lock().unwrap().push(CapturedEvent {
3545 source: source.to_string(),
3546 detail_type: detail_type.to_string(),
3547 detail: detail.to_string(),
3548 });
3549 }
3550 }
3551
3552 fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3553 let recorder = Arc::new(RecordingEb::default());
3554 let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3555 let svc = RdsService::new(Arc::new(RwLock::new(
3556 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3557 )))
3558 .with_delivery_bus(bus);
3559 (svc, recorder)
3560 }
3561
3562 #[test]
3563 fn emit_event_emits_aws_rds_event_via_bus() {
3564 let (svc, rec) = make_service_with_recorder();
3565 svc.emit_event(
3566 RdsSourceType::DbInstance,
3567 "my-db",
3568 "arn:aws:rds:us-east-1:123456789012:db:my-db",
3569 "RDS-EVENT-0005",
3570 &["creation"],
3571 "DB instance created",
3572 );
3573 let events = rec.events.lock().unwrap();
3574 assert_eq!(events.len(), 1);
3575 let e = &events[0];
3576 assert_eq!(e.source, "aws.rds");
3577 assert_eq!(e.detail_type, "RDS DB Instance Event");
3578 let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3579 assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3580 assert_eq!(detail["SourceType"], "DB_INSTANCE");
3581 assert_eq!(detail["SourceIdentifier"], "my-db");
3582 assert_eq!(detail["Message"], "DB instance created");
3583 assert_eq!(detail["EventCategories"][0], "creation");
3584 }
3585
3586 #[test]
3587 fn emit_event_no_op_without_bus() {
3588 let svc = make_service();
3589 svc.emit_event(
3590 RdsSourceType::DbSnapshot,
3591 "snap",
3592 "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3593 "RDS-EVENT-0042",
3594 &["creation"],
3595 "Manual snapshot created",
3596 );
3597 }
3598
3599 #[test]
3600 fn rds_source_type_detail_type_mapping() {
3601 assert_eq!(
3602 RdsSourceType::DbInstance.detail_type(),
3603 "RDS DB Instance Event"
3604 );
3605 assert_eq!(
3606 RdsSourceType::DbSnapshot.detail_type(),
3607 "RDS DB Snapshot Event"
3608 );
3609 assert_eq!(
3610 RdsSourceType::DbParameterGroup.detail_type(),
3611 "RDS DB Parameter Group Event"
3612 );
3613 }
3614
3615 fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3616 String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3617 }
3618
3619 fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3620 let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3621 let mut accounts = svc.state.write();
3622 let state = accounts.default_mut();
3623 state.instances.insert(
3624 identifier.to_string(),
3625 DbInstance {
3626 db_instance_identifier: identifier.to_string(),
3627 db_instance_arn: arn.clone(),
3628 db_instance_class: "db.t3.micro".to_string(),
3629 engine: "postgres".to_string(),
3630 engine_version: "16.3".to_string(),
3631 db_instance_status: "available".to_string(),
3632 master_username: "admin".to_string(),
3633 db_name: Some("appdb".to_string()),
3634 endpoint_address: "127.0.0.1".to_string(),
3635 port: 15432,
3636 allocated_storage: 20,
3637 publicly_accessible: true,
3638 deletion_protection: false,
3639 created_at: Utc::now(),
3640 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3641 master_user_password: "secret".to_string(),
3642 container_id: "container".to_string(),
3643 host_port: 15432,
3644 tags: Vec::new(),
3645 read_replica_source_db_instance_identifier: None,
3646 read_replica_db_instance_identifiers: Vec::new(),
3647 vpc_security_group_ids: vec!["sg-12345678".to_string()],
3648 db_parameter_group_name: Some("default.postgres16".to_string()),
3649 backup_retention_period: 1,
3650 preferred_backup_window: "03:00-04:00".to_string(),
3651 latest_restorable_time: None,
3652 option_group_name: None,
3653 multi_az: false,
3654 pending_modified_values: None,
3655 },
3656 );
3657 arn
3658 }
3659
3660 fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3661 match result {
3662 Ok(_) => panic!("expected error {expected_code}, got Ok"),
3663 Err(e) => {
3664 assert_eq!(e.code(), expected_code, "wrong error code");
3665 e
3666 }
3667 }
3668 }
3669
3670 #[test]
3673 fn add_tags_requires_resource_name() {
3674 let svc = make_service();
3675 let req = request("AddTagsToResource", &[]);
3676 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3677 }
3678
3679 #[test]
3680 fn add_tags_requires_at_least_one_tag() {
3681 let svc = make_service();
3682 let arn = seed_instance(&svc, "db1");
3683 let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3684 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3685 }
3686
3687 #[test]
3688 fn add_tags_appends_then_list_tags_returns_them() {
3689 let svc = make_service();
3690 let arn = seed_instance(&svc, "db1");
3691 let add_req = request(
3692 "AddTagsToResource",
3693 &[
3694 ("ResourceName", arn.as_str()),
3695 ("Tags.Tag.1.Key", "env"),
3696 ("Tags.Tag.1.Value", "dev"),
3697 ],
3698 );
3699 svc.add_tags_to_resource(&add_req).unwrap();
3700
3701 let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3702 let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3703 assert!(body.contains("<Key>env</Key>"));
3704 assert!(body.contains("<Value>dev</Value>"));
3705 }
3706
3707 #[test]
3708 fn list_tags_rejects_filters_param() {
3709 let svc = make_service();
3710 let arn = seed_instance(&svc, "db1");
3711 let req = request(
3712 "ListTagsForResource",
3713 &[
3714 ("ResourceName", arn.as_str()),
3715 ("Filters.Filter.1.Name", "x"),
3716 ],
3717 );
3718 assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3719 }
3720
3721 #[test]
3722 fn list_tags_unknown_arn_errors() {
3723 let svc = make_service();
3724 let req = request(
3725 "ListTagsForResource",
3726 &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3727 );
3728 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3729 }
3730
3731 #[test]
3732 fn remove_tags_strips_only_listed_keys() {
3733 let svc = make_service();
3734 let arn = seed_instance(&svc, "db1");
3735 {
3736 let mut __a = svc.state.write();
3737 let state = __a.default_mut();
3738 let inst = state.instances.get_mut("db1").unwrap();
3739 inst.tags = vec![
3740 RdsTag {
3741 key: "env".to_string(),
3742 value: "dev".to_string(),
3743 },
3744 RdsTag {
3745 key: "team".to_string(),
3746 value: "core".to_string(),
3747 },
3748 ];
3749 }
3750 let req = request(
3751 "RemoveTagsFromResource",
3752 &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3753 );
3754 svc.remove_tags_from_resource(&req).unwrap();
3755
3756 let __a = svc.state.read();
3757 let state = __a.default_ref();
3758 let tags = &state.instances.get("db1").unwrap().tags;
3759 assert_eq!(tags.len(), 1);
3760 assert_eq!(tags[0].key, "team");
3761 }
3762
3763 #[test]
3764 fn remove_tags_requires_keys() {
3765 let svc = make_service();
3766 let arn = seed_instance(&svc, "db1");
3767 let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3768 assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3769 }
3770
3771 fn create_subnet_group(svc: &RdsService, name: &str) {
3774 let req = request(
3775 "CreateDBSubnetGroup",
3776 &[
3777 ("DBSubnetGroupName", name),
3778 ("DBSubnetGroupDescription", "test"),
3779 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3780 ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3781 ],
3782 );
3783 svc.create_db_subnet_group(&req).unwrap();
3784 }
3785
3786 #[test]
3787 fn create_db_subnet_group_requires_two_subnets() {
3788 let svc = make_service();
3789 let req = request(
3790 "CreateDBSubnetGroup",
3791 &[
3792 ("DBSubnetGroupName", "sg1"),
3793 ("DBSubnetGroupDescription", "t"),
3794 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3795 ],
3796 );
3797 assert_code(
3798 svc.create_db_subnet_group(&req),
3799 "DBSubnetGroupDoesNotCoverEnoughAZs",
3800 );
3801 }
3802
3803 #[test]
3804 fn create_db_subnet_group_rejects_empty_subnets() {
3805 let svc = make_service();
3806 let req = request(
3807 "CreateDBSubnetGroup",
3808 &[
3809 ("DBSubnetGroupName", "sg1"),
3810 ("DBSubnetGroupDescription", "t"),
3811 ],
3812 );
3813 assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3814 }
3815
3816 #[test]
3817 fn create_db_subnet_group_rejects_duplicates() {
3818 let svc = make_service();
3819 create_subnet_group(&svc, "sg1");
3820 let req = request(
3821 "CreateDBSubnetGroup",
3822 &[
3823 ("DBSubnetGroupName", "sg1"),
3824 ("DBSubnetGroupDescription", "t"),
3825 ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3826 ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3827 ],
3828 );
3829 assert_code(
3830 svc.create_db_subnet_group(&req),
3831 "DBSubnetGroupAlreadyExists",
3832 );
3833 }
3834
3835 #[test]
3836 fn describe_db_subnet_groups_by_name_or_list() {
3837 let svc = make_service();
3838 create_subnet_group(&svc, "sg-alpha");
3839 create_subnet_group(&svc, "sg-beta");
3840
3841 let by_name = request(
3842 "DescribeDBSubnetGroups",
3843 &[("DBSubnetGroupName", "sg-alpha")],
3844 );
3845 let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3846 assert!(body.contains("sg-alpha"));
3847 assert!(!body.contains("sg-beta"));
3848
3849 let list_all = request("DescribeDBSubnetGroups", &[]);
3850 let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3851 assert!(body.contains("sg-alpha"));
3852 assert!(body.contains("sg-beta"));
3853 }
3854
3855 #[test]
3856 fn describe_db_subnet_groups_unknown_name_errors() {
3857 let svc = make_service();
3858 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3859 assert_code(
3860 svc.describe_db_subnet_groups(&req),
3861 "DBSubnetGroupNotFoundFault",
3862 );
3863 }
3864
3865 #[test]
3866 fn delete_db_subnet_group_unknown_errors() {
3867 let svc = make_service();
3868 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3869 assert_code(
3870 svc.delete_db_subnet_group(&req),
3871 "DBSubnetGroupNotFoundFault",
3872 );
3873 }
3874
3875 #[test]
3876 fn delete_db_subnet_group_removes_entry() {
3877 let svc = make_service();
3878 create_subnet_group(&svc, "sg1");
3879 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3880 svc.delete_db_subnet_group(&req).unwrap();
3881 assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3882 }
3883
3884 #[test]
3885 fn modify_db_subnet_group_updates_subnet_ids() {
3886 let svc = make_service();
3887 create_subnet_group(&svc, "sg1");
3888 let req = request(
3889 "ModifyDBSubnetGroup",
3890 &[
3891 ("DBSubnetGroupName", "sg1"),
3892 ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3893 ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3894 ],
3895 );
3896 svc.modify_db_subnet_group(&req).unwrap();
3897
3898 let __a = svc.state.read();
3899 let state = __a.default_ref();
3900 let sg = state.subnet_groups.get("sg1").unwrap();
3901 assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3902 }
3903
3904 fn create_param_group(svc: &RdsService, name: &str) {
3907 let req = request(
3908 "CreateDBParameterGroup",
3909 &[
3910 ("DBParameterGroupName", name),
3911 ("DBParameterGroupFamily", "postgres16"),
3912 ("Description", "test"),
3913 ],
3914 );
3915 svc.create_db_parameter_group(&req).unwrap();
3916 }
3917
3918 #[test]
3919 fn create_db_parameter_group_rejects_unknown_family() {
3920 let svc = make_service();
3921 let req = request(
3922 "CreateDBParameterGroup",
3923 &[
3924 ("DBParameterGroupName", "pg1"),
3925 ("DBParameterGroupFamily", "oracle19"),
3926 ("Description", "t"),
3927 ],
3928 );
3929 assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3930 }
3931
3932 #[test]
3933 fn create_db_parameter_group_rejects_duplicates() {
3934 let svc = make_service();
3935 create_param_group(&svc, "pg1");
3936 let req = request(
3937 "CreateDBParameterGroup",
3938 &[
3939 ("DBParameterGroupName", "pg1"),
3940 ("DBParameterGroupFamily", "postgres16"),
3941 ("Description", "t"),
3942 ],
3943 );
3944 assert_code(
3945 svc.create_db_parameter_group(&req),
3946 "DBParameterGroupAlreadyExists",
3947 );
3948 }
3949
3950 #[test]
3951 fn describe_db_parameter_groups_by_name_or_list() {
3952 let svc = make_service();
3953 create_param_group(&svc, "pg-alpha");
3954 create_param_group(&svc, "pg-beta");
3955 let by_name = request(
3956 "DescribeDBParameterGroups",
3957 &[("DBParameterGroupName", "pg-alpha")],
3958 );
3959 let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3960 assert!(body.contains("pg-alpha"));
3961 assert!(!body.contains("pg-beta"));
3962 let list = request("DescribeDBParameterGroups", &[]);
3963 let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3964 assert!(body.contains("pg-alpha"));
3965 assert!(body.contains("pg-beta"));
3966 }
3967
3968 #[test]
3969 fn describe_db_parameter_groups_unknown_name_errors() {
3970 let svc = make_service();
3971 let req = request(
3972 "DescribeDBParameterGroups",
3973 &[("DBParameterGroupName", "ghost")],
3974 );
3975 assert_code(
3976 svc.describe_db_parameter_groups(&req),
3977 "DBParameterGroupNotFound",
3978 );
3979 }
3980
3981 #[test]
3982 fn delete_db_parameter_group_rejects_default_groups() {
3983 let svc = make_service();
3984 let req = request(
3985 "DeleteDBParameterGroup",
3986 &[("DBParameterGroupName", "default.postgres16")],
3987 );
3988 assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3989 }
3990
3991 #[test]
3992 fn delete_db_parameter_group_unknown_errors() {
3993 let svc = make_service();
3994 let req = request(
3995 "DeleteDBParameterGroup",
3996 &[("DBParameterGroupName", "ghost")],
3997 );
3998 assert_code(
3999 svc.delete_db_parameter_group(&req),
4000 "DBParameterGroupNotFound",
4001 );
4002 }
4003
4004 #[test]
4005 fn delete_db_parameter_group_removes_entry() {
4006 let svc = make_service();
4007 create_param_group(&svc, "pg1");
4008 let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
4009 svc.delete_db_parameter_group(&req).unwrap();
4010 assert!(!svc
4011 .state
4012 .read()
4013 .default_ref()
4014 .parameter_groups
4015 .contains_key("pg1"));
4016 }
4017
4018 #[test]
4019 fn modify_db_parameter_group_updates_description() {
4020 let svc = make_service();
4021 create_param_group(&svc, "pg1");
4022 let req = request(
4023 "ModifyDBParameterGroup",
4024 &[
4025 ("DBParameterGroupName", "pg1"),
4026 ("Description", "shiny new"),
4027 ],
4028 );
4029 svc.modify_db_parameter_group(&req).unwrap();
4030 let __a = svc.state.read();
4031 let state = __a.default_ref();
4032 assert_eq!(
4033 state.parameter_groups.get("pg1").unwrap().description,
4034 "shiny new"
4035 );
4036 }
4037
4038 #[test]
4039 fn modify_db_parameter_group_unknown_errors() {
4040 let svc = make_service();
4041 let req = request(
4042 "ModifyDBParameterGroup",
4043 &[("DBParameterGroupName", "ghost"), ("Description", "x")],
4044 );
4045 assert_code(
4046 svc.modify_db_parameter_group(&req),
4047 "DBParameterGroupNotFound",
4048 );
4049 }
4050
4051 #[test]
4054 fn describe_db_instances_by_id_returns_only_one() {
4055 let svc = make_service();
4056 seed_instance(&svc, "db1");
4057 seed_instance(&svc, "db2");
4058 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
4059 let body = body_of(svc.describe_db_instances(&req).unwrap());
4060 assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
4061 assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
4062 }
4063
4064 #[test]
4065 fn describe_db_instances_unknown_id_errors() {
4066 let svc = make_service();
4067 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4068 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4069 }
4070
4071 #[test]
4072 fn describe_db_instances_lists_all_when_unbounded() {
4073 let svc = make_service();
4074 seed_instance(&svc, "db1");
4075 seed_instance(&svc, "db2");
4076 seed_instance(&svc, "db3");
4077 let req = request("DescribeDBInstances", &[]);
4078 let body = body_of(svc.describe_db_instances(&req).unwrap());
4079 for id in ["db1", "db2", "db3"] {
4080 assert!(body.contains(&format!(
4081 "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
4082 )));
4083 }
4084 }
4085
4086 #[test]
4089 fn modify_db_instance_requires_at_least_one_change() {
4090 let svc = make_service();
4091 seed_instance(&svc, "db1");
4092 let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4093 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4094 }
4095
4096 #[test]
4097 fn modify_db_instance_unknown_errors() {
4098 let svc = make_service();
4099 let req = request(
4100 "ModifyDBInstance",
4101 &[
4102 ("DBInstanceIdentifier", "ghost"),
4103 ("DBInstanceClass", "db.t3.small"),
4104 ],
4105 );
4106 assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4107 }
4108
4109 #[test]
4110 fn modify_db_instance_apply_immediately_updates_class() {
4111 let svc = make_service();
4112 seed_instance(&svc, "db1");
4113 let req = request(
4114 "ModifyDBInstance",
4115 &[
4116 ("DBInstanceIdentifier", "db1"),
4117 ("DBInstanceClass", "db.t3.small"),
4118 ("ApplyImmediately", "true"),
4119 ],
4120 );
4121 svc.modify_db_instance(&req).unwrap();
4122 let __a = svc.state.read();
4123 let state = __a.default_ref();
4124 assert_eq!(
4125 state.instances.get("db1").unwrap().db_instance_class,
4126 "db.t3.small"
4127 );
4128 }
4129
4130 #[test]
4131 fn modify_db_instance_pending_when_not_apply_immediately() {
4132 let svc = make_service();
4133 seed_instance(&svc, "db1");
4134 let req = request(
4135 "ModifyDBInstance",
4136 &[
4137 ("DBInstanceIdentifier", "db1"),
4138 ("DBInstanceClass", "db.t3.small"),
4139 ("ApplyImmediately", "false"),
4140 ],
4141 );
4142 svc.modify_db_instance(&req).unwrap();
4143 let __a = svc.state.read();
4144 let state = __a.default_ref();
4145 let inst = state.instances.get("db1").unwrap();
4146 assert_eq!(inst.db_instance_class, "db.t3.micro");
4147 assert_eq!(
4148 inst.pending_modified_values
4149 .as_ref()
4150 .unwrap()
4151 .db_instance_class
4152 .as_deref(),
4153 Some("db.t3.small"),
4154 );
4155 }
4156
4157 fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4160 let mut __a = svc.state.write();
4161 let state = __a.default_mut();
4162 let arn = state.db_snapshot_arn(snapshot_id);
4163 state.snapshots.insert(
4164 snapshot_id.to_string(),
4165 crate::state::DbSnapshot {
4166 db_snapshot_identifier: snapshot_id.to_string(),
4167 db_snapshot_arn: arn,
4168 db_instance_identifier: instance_id.to_string(),
4169 snapshot_create_time: Utc::now(),
4170 engine: "postgres".to_string(),
4171 engine_version: "16.3".to_string(),
4172 allocated_storage: 20,
4173 status: "available".to_string(),
4174 port: 5432,
4175 master_username: "admin".to_string(),
4176 db_name: Some("appdb".to_string()),
4177 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4178 snapshot_type: "manual".to_string(),
4179 master_user_password: "secret".to_string(),
4180 tags: Vec::new(),
4181 dump_data: Vec::new(),
4182 },
4183 );
4184 }
4185
4186 #[test]
4187 fn delete_db_snapshot_removes_entry() {
4188 let svc = make_service();
4189 seed_snapshot(&svc, "snap1", "db1");
4190 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4191 svc.delete_db_snapshot(&req).unwrap();
4192 assert!(svc.state.read().default_ref().snapshots.is_empty());
4193 }
4194
4195 #[test]
4196 fn delete_db_snapshot_unknown_errors() {
4197 let svc = make_service();
4198 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4199 assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4200 }
4201
4202 #[test]
4203 fn describe_db_snapshots_rejects_both_filters() {
4204 let svc = make_service();
4205 let req = request(
4206 "DescribeDBSnapshots",
4207 &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4208 );
4209 assert_code(
4210 svc.describe_db_snapshots(&req),
4211 "InvalidParameterCombination",
4212 );
4213 }
4214
4215 #[test]
4216 fn describe_db_snapshots_by_id_or_instance() {
4217 let svc = make_service();
4218 seed_snapshot(&svc, "snap1", "db1");
4219 seed_snapshot(&svc, "snap2", "db2");
4220
4221 let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4222 let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4223 assert!(body.contains("snap1"));
4224 assert!(!body.contains("snap2"));
4225
4226 let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4227 let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4228 assert!(body.contains("snap2"));
4229 assert!(!body.contains("snap1"));
4230
4231 let list_all = request("DescribeDBSnapshots", &[]);
4232 let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4233 assert!(body.contains("snap1"));
4234 assert!(body.contains("snap2"));
4235 }
4236
4237 #[test]
4238 fn describe_db_snapshots_unknown_id_errors() {
4239 let svc = make_service();
4240 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4241 assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4242 }
4243
4244 #[test]
4247 fn describe_db_instances_not_found() {
4248 let svc = make_service();
4249 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4250 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4251 }
4252
4253 #[tokio::test]
4254 async fn delete_db_instance_not_found() {
4255 let svc = make_service();
4256 let req = request(
4257 "DeleteDBInstance",
4258 &[
4259 ("DBInstanceIdentifier", "ghost"),
4260 ("SkipFinalSnapshot", "true"),
4261 ],
4262 );
4263 assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4264 }
4265
4266 #[test]
4267 fn modify_db_instance_not_found() {
4268 let svc = make_service();
4269 let req = request(
4270 "ModifyDBInstance",
4271 &[
4272 ("DBInstanceIdentifier", "ghost"),
4273 ("AllocatedStorage", "20"),
4274 ],
4275 );
4276 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4278 }
4279
4280 #[tokio::test]
4281 async fn reboot_db_instance_not_found() {
4282 let svc = make_service();
4283 let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4284 assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4285 }
4286
4287 #[tokio::test]
4288 async fn create_db_snapshot_instance_not_found() {
4289 let svc = make_service();
4290 let req = request(
4291 "CreateDBSnapshot",
4292 &[
4293 ("DBInstanceIdentifier", "ghost"),
4294 ("DBSnapshotIdentifier", "snap1"),
4295 ],
4296 );
4297 assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4298 }
4299
4300 #[tokio::test]
4301 async fn restore_db_instance_snapshot_not_found() {
4302 let svc = make_service();
4303 let req = request(
4304 "RestoreDBInstanceFromDBSnapshot",
4305 &[
4306 ("DBInstanceIdentifier", "restored"),
4307 ("DBSnapshotIdentifier", "ghost-snap"),
4308 ],
4309 );
4310 assert_code(
4311 svc.restore_db_instance_from_db_snapshot(&req).await,
4312 "InvalidParameterValue",
4313 );
4314 }
4315
4316 #[tokio::test]
4317 async fn create_db_instance_read_replica_source_not_found() {
4318 let svc = make_service();
4319 let req = request(
4320 "CreateDBInstanceReadReplica",
4321 &[
4322 ("DBInstanceIdentifier", "replica"),
4323 ("SourceDBInstanceIdentifier", "ghost"),
4324 ],
4325 );
4326 assert_code(
4327 svc.create_db_instance_read_replica(&req).await,
4328 "InvalidParameterValue",
4329 );
4330 }
4331
4332 #[test]
4333 fn describe_db_engine_versions_basic() {
4334 let svc = make_service();
4335 let req = request("DescribeDBEngineVersions", &[]);
4336 let resp = svc.describe_db_engine_versions(&req).unwrap();
4337 let body = body_of(resp);
4338 assert!(body.contains("<DBEngineVersions>"));
4339 }
4340
4341 #[test]
4342 fn describe_orderable_db_instance_options_basic() {
4343 let svc = make_service();
4344 let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4345 let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4346 let body = body_of(resp);
4347 assert!(body.contains("<OrderableDBInstanceOptions>"));
4348 }
4349
4350 #[test]
4351 fn describe_db_parameter_group_not_found() {
4352 let svc = make_service();
4353 let req = request(
4354 "DescribeDBParameterGroups",
4355 &[("DBParameterGroupName", "ghost")],
4356 );
4357 assert_code(
4358 svc.describe_db_parameter_groups(&req),
4359 "DBParameterGroupNotFound",
4360 );
4361 }
4362
4363 #[test]
4364 fn delete_db_parameter_group_not_found() {
4365 let svc = make_service();
4366 let req = request(
4367 "DeleteDBParameterGroup",
4368 &[("DBParameterGroupName", "ghost")],
4369 );
4370 assert_code(
4371 svc.delete_db_parameter_group(&req),
4372 "DBParameterGroupNotFound",
4373 );
4374 }
4375
4376 #[test]
4377 fn describe_db_subnet_group_not_found() {
4378 let svc = make_service();
4379 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4380 assert_code(
4381 svc.describe_db_subnet_groups(&req),
4382 "DBSubnetGroupNotFoundFault",
4383 );
4384 }
4385
4386 #[test]
4387 fn delete_db_subnet_group_not_found() {
4388 let svc = make_service();
4389 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4390 assert_code(
4391 svc.delete_db_subnet_group(&req),
4392 "DBSubnetGroupNotFoundFault",
4393 );
4394 }
4395
4396 #[test]
4397 fn add_tags_resource_not_found() {
4398 let svc = make_service();
4399 let req = request(
4400 "AddTagsToResource",
4401 &[
4402 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4403 ("Tags.member.1.Key", "k"),
4404 ("Tags.member.1.Value", "v"),
4405 ],
4406 );
4407 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4408 }
4409
4410 #[test]
4411 fn list_tags_resource_not_found() {
4412 let svc = make_service();
4413 let req = request(
4414 "ListTagsForResource",
4415 &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4416 );
4417 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4418 }
4419
4420 #[tokio::test]
4423 async fn create_db_snapshot_missing_id_errors() {
4424 let svc = make_service();
4425 let req = request(
4426 "CreateDBSnapshot",
4427 &[("DBInstanceIdentifier", "nonexistent")],
4428 );
4429 assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4430 }
4431
4432 #[tokio::test]
4433 async fn create_db_snapshot_unknown_instance_errors() {
4434 let svc = make_service();
4435 let req = request(
4436 "CreateDBSnapshot",
4437 &[
4438 ("DBSnapshotIdentifier", "snap1"),
4439 ("DBInstanceIdentifier", "ghost"),
4440 ],
4441 );
4442 assert!(svc.create_db_snapshot(&req).await.is_err());
4443 }
4444
4445 #[tokio::test]
4448 async fn delete_db_instance_missing_id_errors() {
4449 let svc = make_service();
4450 let req = request("DeleteDBInstance", &[]);
4451 assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4452 }
4453
4454 #[tokio::test]
4457 async fn reboot_db_instance_missing_id_errors() {
4458 let svc = make_service();
4459 let req = request("RebootDBInstance", &[]);
4460 assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4461 }
4462
4463 #[tokio::test]
4466 async fn create_db_instance_missing_id_errors() {
4467 let svc = make_service();
4468 let req = request(
4469 "CreateDBInstance",
4470 &[
4471 ("Engine", "postgres"),
4472 ("DBInstanceClass", "db.t3.micro"),
4473 ("AllocatedStorage", "20"),
4474 ("MasterUsername", "admin"),
4475 ("MasterUserPassword", "secretpass"),
4476 ],
4477 );
4478 assert!(svc.create_db_instance(&req).await.is_err());
4479 }
4480
4481 #[tokio::test]
4482 async fn create_db_instance_unsupported_engine_errors() {
4483 let svc = make_service();
4484 let req = request(
4485 "CreateDBInstance",
4486 &[
4487 ("DBInstanceIdentifier", "db1"),
4488 ("Engine", "mongodb"),
4489 ("DBInstanceClass", "db.t3.micro"),
4490 ("AllocatedStorage", "20"),
4491 ("MasterUsername", "admin"),
4492 ("MasterUserPassword", "secretpass"),
4493 ],
4494 );
4495 assert!(svc.create_db_instance(&req).await.is_err());
4496 }
4497
4498 #[tokio::test]
4501 async fn restore_db_instance_missing_ids_errors() {
4502 let svc = make_service();
4503 let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4504 assert!(svc
4505 .restore_db_instance_from_db_snapshot(&req)
4506 .await
4507 .is_err());
4508 }
4509
4510 #[tokio::test]
4511 async fn restore_db_instance_unknown_snapshot_errors() {
4512 let svc = make_service();
4513 let req = request(
4514 "RestoreDBInstanceFromDBSnapshot",
4515 &[
4516 ("DBInstanceIdentifier", "restored"),
4517 ("DBSnapshotIdentifier", "missing"),
4518 ],
4519 );
4520 assert!(svc
4521 .restore_db_instance_from_db_snapshot(&req)
4522 .await
4523 .is_err());
4524 }
4525
4526 #[tokio::test]
4529 async fn create_read_replica_missing_source_errors() {
4530 let svc = make_service();
4531 let req = request(
4532 "CreateDBInstanceReadReplica",
4533 &[("DBInstanceIdentifier", "replica1")],
4534 );
4535 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4536 }
4537
4538 #[tokio::test]
4539 async fn create_read_replica_unknown_source_errors() {
4540 let svc = make_service();
4541 let req = request(
4542 "CreateDBInstanceReadReplica",
4543 &[
4544 ("DBInstanceIdentifier", "replica1"),
4545 ("SourceDBInstanceIdentifier", "ghost"),
4546 ],
4547 );
4548 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4549 }
4550
4551 #[test]
4554 fn describe_db_snapshots_by_snapshot_id_only() {
4555 let svc = make_service();
4556 seed_snapshot(&svc, "s1", "inst1");
4557 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4558 let resp = svc.describe_db_snapshots(&req).unwrap();
4559 let b = body_of(resp);
4560 assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4561 }
4562
4563 #[test]
4564 fn describe_db_snapshots_by_instance_id_returns_matching() {
4565 let svc = make_service();
4566 seed_snapshot(&svc, "s1", "inst1");
4567 seed_snapshot(&svc, "s2", "inst2");
4568 let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4569 let resp = svc.describe_db_snapshots(&req).unwrap();
4570 let b = body_of(resp);
4571 assert!(b.contains("s1"));
4572 assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4573 }
4574
4575 #[test]
4578 fn modify_db_parameter_group_missing_name() {
4579 let svc = make_service();
4580 let req = request("ModifyDBParameterGroup", &[]);
4581 assert!(svc.modify_db_parameter_group(&req).is_err());
4582 }
4583
4584 #[test]
4587 fn modify_db_subnet_group_unknown_errors() {
4588 let svc = make_service();
4589 let req = request(
4590 "ModifyDBSubnetGroup",
4591 &[
4592 ("DBSubnetGroupName", "ghost"),
4593 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4594 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4595 ],
4596 );
4597 assert!(svc.modify_db_subnet_group(&req).is_err());
4598 }
4599
4600 #[test]
4603 fn describe_db_instances_empty_returns_xml() {
4604 let svc = make_service();
4605 let req = request("DescribeDBInstances", &[]);
4606 let resp = svc.describe_db_instances(&req).unwrap();
4607 let b = body_of(resp);
4608 assert!(b.contains("DescribeDBInstancesResult"));
4609 }
4610
4611 #[test]
4612 fn describe_db_snapshots_empty_returns_empty_list() {
4613 let svc = make_service();
4614 let req = request("DescribeDBSnapshots", &[]);
4615 let resp = svc.describe_db_snapshots(&req).unwrap();
4616 let b = body_of(resp);
4617 assert!(b.contains("DescribeDBSnapshotsResult"));
4618 }
4619
4620 #[test]
4621 fn add_tags_unknown_resource_errors() {
4622 let svc = make_service();
4623 let req = request(
4624 "AddTagsToResource",
4625 &[
4626 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4627 ("Tags.member.1.Key", "k"),
4628 ("Tags.member.1.Value", "v"),
4629 ],
4630 );
4631 assert!(svc.add_tags_to_resource(&req).is_err());
4632 }
4633
4634 #[test]
4635 fn remove_tags_unknown_resource_errors() {
4636 let svc = make_service();
4637 let req = request(
4638 "RemoveTagsFromResource",
4639 &[
4640 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4641 ("TagKeys.member.1", "k"),
4642 ],
4643 );
4644 assert!(svc.remove_tags_from_resource(&req).is_err());
4645 }
4646
4647 #[test]
4648 fn create_db_parameter_group_missing_name_errors() {
4649 let svc = make_service();
4650 let req = request(
4651 "CreateDBParameterGroup",
4652 &[
4653 ("DBParameterGroupFamily", "postgres16"),
4654 ("Description", "d"),
4655 ],
4656 );
4657 assert!(svc.create_db_parameter_group(&req).is_err());
4658 }
4659
4660 #[test]
4661 fn create_db_subnet_group_missing_desc_errors() {
4662 let svc = make_service();
4663 let req = request(
4664 "CreateDBSubnetGroup",
4665 &[
4666 ("DBSubnetGroupName", "sg1"),
4667 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4668 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4669 ],
4670 );
4671 assert!(svc.create_db_subnet_group(&req).is_err());
4672 }
4673
4674 #[tokio::test]
4675 async fn create_db_instance_missing_class_errors() {
4676 let svc = make_service();
4677 let req = request(
4678 "CreateDBInstance",
4679 &[
4680 ("DBInstanceIdentifier", "miss-class"),
4681 ("Engine", "postgres"),
4682 ("AllocatedStorage", "20"),
4683 ("MasterUsername", "admin"),
4684 ("MasterUserPassword", "secretpass"),
4685 ],
4686 );
4687 assert!(svc.create_db_instance(&req).await.is_err());
4688 }
4689
4690 #[tokio::test]
4691 async fn create_db_instance_missing_master_username_errors() {
4692 let svc = make_service();
4693 let req = request(
4694 "CreateDBInstance",
4695 &[
4696 ("DBInstanceIdentifier", "miss-mu"),
4697 ("Engine", "postgres"),
4698 ("DBInstanceClass", "db.t3.micro"),
4699 ("AllocatedStorage", "20"),
4700 ("MasterUserPassword", "secretpass"),
4701 ],
4702 );
4703 assert!(svc.create_db_instance(&req).await.is_err());
4704 }
4705
4706 #[test]
4707 fn modify_db_instance_missing_id_errors() {
4708 let svc = make_service();
4709 let req = request("ModifyDBInstance", &[]);
4710 assert!(svc.modify_db_instance(&req).is_err());
4711 }
4712
4713 #[test]
4714 fn modify_db_parameter_group_unknown_pg_errors() {
4715 let svc = make_service();
4716 let req = request(
4717 "ModifyDBParameterGroup",
4718 &[
4719 ("DBParameterGroupName", "ghost"),
4720 ("Parameters.member.1.ParameterName", "p"),
4721 ("Parameters.member.1.ParameterValue", "v"),
4722 ("Parameters.member.1.ApplyMethod", "immediate"),
4723 ],
4724 );
4725 assert!(svc.modify_db_parameter_group(&req).is_err());
4726 }
4727
4728 #[test]
4729 fn describe_db_parameter_groups_unknown_errors() {
4730 let svc = make_service();
4731 let req = request(
4732 "DescribeDBParameterGroups",
4733 &[("DBParameterGroupName", "ghost")],
4734 );
4735 assert!(svc.describe_db_parameter_groups(&req).is_err());
4736 }
4737
4738 #[test]
4739 fn describe_db_subnet_groups_unknown_errors() {
4740 let svc = make_service();
4741 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4742 assert!(svc.describe_db_subnet_groups(&req).is_err());
4743 }
4744}