1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8
9use fakecloud_aws::xml::xml_escape;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::runtime::{RdsRuntime, RuntimeError};
13use crate::state::{
14 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
15 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsTag, SharedRdsState,
16};
17
18const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
19const SUPPORTED_ACTIONS: &[&str] = &[
20 "AddTagsToResource",
21 "CreateDBInstance",
22 "CreateDBInstanceReadReplica",
23 "CreateDBParameterGroup",
24 "CreateDBSnapshot",
25 "CreateDBSubnetGroup",
26 "DeleteDBInstance",
27 "DeleteDBParameterGroup",
28 "DeleteDBSnapshot",
29 "DeleteDBSubnetGroup",
30 "DescribeDBEngineVersions",
31 "DescribeDBInstances",
32 "DescribeDBParameterGroups",
33 "DescribeDBSnapshots",
34 "DescribeDBSubnetGroups",
35 "DescribeOrderableDBInstanceOptions",
36 "ListTagsForResource",
37 "ModifyDBInstance",
38 "ModifyDBParameterGroup",
39 "ModifyDBSubnetGroup",
40 "RebootDBInstance",
41 "RemoveTagsFromResource",
42 "RestoreDBInstanceFromDBSnapshot",
43];
44
45pub struct RdsService {
46 state: SharedRdsState,
47 runtime: Option<Arc<RdsRuntime>>,
48}
49
50impl RdsService {
51 pub fn new(state: SharedRdsState) -> Self {
52 Self {
53 state,
54 runtime: None,
55 }
56 }
57
58 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
59 self.runtime = Some(runtime);
60 self
61 }
62
63 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
69 self.runtime.as_ref().ok_or_else(|| {
70 AwsServiceError::aws_error(
71 StatusCode::SERVICE_UNAVAILABLE,
72 "InvalidParameterValue",
73 "Docker/Podman is required for RDS DB instances but is not available",
74 )
75 })
76 }
77}
78
79#[async_trait]
80impl AwsService for RdsService {
81 fn service_name(&self) -> &str {
82 "rds"
83 }
84
85 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
86 match request.action.as_str() {
87 "AddTagsToResource" => self.add_tags_to_resource(&request),
88 "CreateDBInstance" => self.create_db_instance(&request).await,
89 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
90 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
91 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
92 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
93 "DeleteDBInstance" => self.delete_db_instance(&request).await,
94 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
95 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
96 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
97 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
98 "DescribeDBInstances" => self.describe_db_instances(&request),
99 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
100 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
101 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
102 "DescribeOrderableDBInstanceOptions" => {
103 self.describe_orderable_db_instance_options(&request)
104 }
105 "ListTagsForResource" => self.list_tags_for_resource(&request),
106 "ModifyDBInstance" => self.modify_db_instance(&request),
107 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
108 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
109 "RebootDBInstance" => self.reboot_db_instance(&request).await,
110 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
111 "RestoreDBInstanceFromDBSnapshot" => {
112 self.restore_db_instance_from_db_snapshot(&request).await
113 }
114 _ => Err(AwsServiceError::action_not_implemented(
115 self.service_name(),
116 &request.action,
117 )),
118 }
119 }
120
121 fn supported_actions(&self) -> &[&str] {
122 SUPPORTED_ACTIONS
123 }
124}
125
126impl RdsService {
127 async fn create_db_instance(
128 &self,
129 request: &AwsRequest,
130 ) -> Result<AwsResponse, AwsServiceError> {
131 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
132 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
133 let db_instance_class = required_param(request, "DBInstanceClass")?;
134 let engine = required_param(request, "Engine")?;
135 let master_username = required_param(request, "MasterUsername")?;
136 let master_user_password = required_param(request, "MasterUserPassword")?;
137 let db_name = optional_param(request, "DBName");
138 let engine_version =
139 optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
140 let publicly_accessible =
141 parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
142 .unwrap_or(true);
143 let deletion_protection =
144 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
145 .unwrap_or(false);
146 let port = optional_i32_param(request, "Port")?
147 .unwrap_or_else(|| default_port_for_engine(&engine));
148 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
149
150 let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
151 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
152
153 let backup_retention_period =
154 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
155 let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
156 .unwrap_or_else(|| "03:00-04:00".to_string());
157 let option_group_name = optional_param(request, "OptionGroupName");
158 let multi_az =
159 parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
160
161 validate_create_request(
162 &db_instance_identifier,
163 allocated_storage,
164 &db_instance_class,
165 &engine,
166 &engine_version,
167 port,
168 )?;
169
170 {
171 let mut state = self.state.write();
172 if !state.begin_instance_creation(&db_instance_identifier) {
173 return Err(AwsServiceError::aws_error(
174 StatusCode::BAD_REQUEST,
175 "DBInstanceAlreadyExists",
176 format!("DBInstance {} already exists.", db_instance_identifier),
177 ));
178 }
179 if let Some(ref pg_name) = db_parameter_group_name {
181 if !state.parameter_groups.contains_key(pg_name) {
182 state.cancel_instance_creation(&db_instance_identifier);
183 return Err(AwsServiceError::aws_error(
184 StatusCode::NOT_FOUND,
185 "DBParameterGroupNotFound",
186 format!("DBParameterGroup {} not found.", pg_name),
187 ));
188 }
189 }
190 }
191
192 let runtime = self.require_runtime()?;
193
194 let logical_db_name = db_name
195 .clone()
196 .unwrap_or_else(|| default_db_name(&engine).to_string());
197 let running = runtime
198 .ensure_postgres(
199 &db_instance_identifier,
200 &engine,
201 &engine_version,
202 &master_username,
203 &master_user_password,
204 &logical_db_name,
205 )
206 .await
207 .map_err(|error| {
208 self.state
209 .write()
210 .cancel_instance_creation(&db_instance_identifier);
211 runtime_error_to_service_error(error)
212 })?;
213
214 let mut state = self.state.write();
215 let created_at = Utc::now();
216 let instance = DbInstance {
217 db_instance_identifier: db_instance_identifier.clone(),
218 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
219 db_instance_class: db_instance_class.clone(),
220 engine: engine.clone(),
221 engine_version: engine_version.clone(),
222 db_instance_status: "available".to_string(),
223 master_username: master_username.clone(),
224 db_name: db_name.clone(),
225 endpoint_address: "127.0.0.1".to_string(),
226 port: i32::from(running.host_port),
227 allocated_storage,
228 publicly_accessible,
229 deletion_protection,
230 created_at,
231 dbi_resource_id: state.next_dbi_resource_id(),
232 master_user_password,
233 container_id: running.container_id,
234 host_port: running.host_port,
235 tags: Vec::new(),
236 read_replica_source_db_instance_identifier: None,
237 read_replica_db_instance_identifiers: Vec::new(),
238 vpc_security_group_ids,
239 db_parameter_group_name,
240 backup_retention_period,
241 preferred_backup_window,
242 latest_restorable_time: if backup_retention_period > 0 {
243 Some(created_at)
244 } else {
245 None
246 },
247 option_group_name,
248 multi_az,
249 pending_modified_values: None,
250 };
251 state.finish_instance_creation(instance.clone());
252
253 Ok(AwsResponse::xml(
254 StatusCode::OK,
255 xml_wrap(
256 "CreateDBInstance",
257 &format!(
258 "<DBInstance>{}</DBInstance>",
259 db_instance_xml(&instance, Some("creating"))
260 ),
261 &request.request_id,
262 ),
263 ))
264 }
265
266 async fn delete_db_instance(
267 &self,
268 request: &AwsRequest,
269 ) -> Result<AwsResponse, AwsServiceError> {
270 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
271 let skip_final_snapshot =
272 parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
273 .unwrap_or(false);
274 let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
275
276 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
277 return Err(AwsServiceError::aws_error(
278 StatusCode::BAD_REQUEST,
279 "InvalidParameterCombination",
280 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
281 ));
282 }
283 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
284 return Err(AwsServiceError::aws_error(
285 StatusCode::BAD_REQUEST,
286 "InvalidParameterCombination",
287 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
288 ));
289 }
290
291 {
293 let state = self.state.read();
294 if let Some(instance) = state.instances.get(&db_instance_identifier) {
295 if instance.deletion_protection {
296 return Err(AwsServiceError::aws_error(
297 StatusCode::BAD_REQUEST,
298 "InvalidDBInstanceState",
299 format!(
300 "DBInstance {} cannot be deleted because deletion protection is enabled.",
301 db_instance_identifier
302 ),
303 ));
304 }
305 } else {
306 return Err(db_instance_not_found(&db_instance_identifier));
307 }
308 }
309
310 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
311 self.create_final_db_snapshot(&db_instance_identifier, snapshot_id)
312 .await?;
313 }
314
315 let instance = {
316 let mut state = self.state.write();
317 let instance = state
318 .instances
319 .remove(&db_instance_identifier)
320 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
321
322 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
323 if let Some(source) = state.instances.get_mut(source_id) {
324 source
325 .read_replica_db_instance_identifiers
326 .retain(|id| id != &db_instance_identifier);
327 }
328 }
329
330 for replica_id in &instance.read_replica_db_instance_identifiers {
331 if let Some(replica) = state.instances.get_mut(replica_id) {
332 replica.read_replica_source_db_instance_identifier = None;
333 }
334 }
335
336 instance
337 };
338
339 if let Some(runtime) = &self.runtime {
340 runtime.stop_container(&db_instance_identifier).await;
341 }
342
343 Ok(AwsResponse::xml(
344 StatusCode::OK,
345 xml_wrap(
346 "DeleteDBInstance",
347 &format!(
348 "<DBInstance>{}</DBInstance>",
349 db_instance_xml(&instance, Some("deleting"))
350 ),
351 &request.request_id,
352 ),
353 ))
354 }
355
356 async fn create_final_db_snapshot(
362 &self,
363 db_instance_identifier: &str,
364 snapshot_id: &str,
365 ) -> Result<(), AwsServiceError> {
366 let runtime = self.runtime.as_ref().ok_or_else(|| {
367 AwsServiceError::aws_error(
368 StatusCode::SERVICE_UNAVAILABLE,
369 "InvalidParameterValue",
370 "Docker/Podman is required for RDS snapshots but is not available",
371 )
372 })?;
373
374 let (instance_for_snapshot, db_name) = {
375 let state = self.state.read();
376
377 if state.snapshots.contains_key(snapshot_id) {
378 return Err(AwsServiceError::aws_error(
379 StatusCode::CONFLICT,
380 "DBSnapshotAlreadyExists",
381 format!("DBSnapshot {snapshot_id} already exists."),
382 ));
383 }
384
385 let instance = state
386 .instances
387 .get(db_instance_identifier)
388 .cloned()
389 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
390
391 let default_db = default_db_name(&instance.engine);
392 let db_name = instance
393 .db_name
394 .as_deref()
395 .unwrap_or(default_db)
396 .to_string();
397
398 (instance, db_name)
399 };
400
401 let dump_data = runtime
402 .dump_database(
403 db_instance_identifier,
404 &instance_for_snapshot.engine,
405 &instance_for_snapshot.master_username,
406 &instance_for_snapshot.master_user_password,
407 &db_name,
408 )
409 .await
410 .map_err(runtime_error_to_service_error)?;
411
412 let mut state = self.state.write();
413
414 if state.snapshots.contains_key(snapshot_id) {
415 return Err(AwsServiceError::aws_error(
416 StatusCode::CONFLICT,
417 "DBSnapshotAlreadyExists",
418 format!("DBSnapshot {snapshot_id} already exists."),
419 ));
420 }
421
422 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
423
424 let snapshot = DbSnapshot {
425 db_snapshot_identifier: snapshot_id.to_string(),
426 db_snapshot_arn: snapshot_arn,
427 db_instance_identifier: db_instance_identifier.to_string(),
428 snapshot_create_time: Utc::now(),
429 engine: instance_for_snapshot.engine.clone(),
430 engine_version: instance_for_snapshot.engine_version.clone(),
431 allocated_storage: instance_for_snapshot.allocated_storage,
432 status: "available".to_string(),
433 port: instance_for_snapshot.port,
434 master_username: instance_for_snapshot.master_username.clone(),
435 db_name: instance_for_snapshot.db_name.clone(),
436 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
437 snapshot_type: "manual".to_string(),
438 master_user_password: instance_for_snapshot.master_user_password.clone(),
439 tags: Vec::new(),
440 dump_data,
441 };
442
443 state.snapshots.insert(snapshot_id.to_string(), snapshot);
444 Ok(())
445 }
446
447 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
449 let db_instance_class = optional_param(request, "DBInstanceClass");
450 let deletion_protection =
451 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
452 let apply_immediately =
453 parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
454
455 let vpc_security_group_ids = {
457 let mut ids = Vec::new();
458 for index in 1.. {
459 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
460 match optional_param(request, &sg_id_name) {
461 Some(sg_id) => ids.push(sg_id),
462 None => break,
463 }
464 }
465 if ids.is_empty() {
466 None
467 } else {
468 Some(ids)
469 }
470 };
471
472 if db_instance_class.is_none()
473 && deletion_protection.is_none()
474 && vpc_security_group_ids.is_none()
475 {
476 return Err(AwsServiceError::aws_error(
477 StatusCode::BAD_REQUEST,
478 "InvalidParameterCombination",
479 "At least one supported mutable field must be provided.",
480 ));
481 }
482 if let Some(ref class) = db_instance_class {
483 validate_db_instance_class(class)?;
484 }
485
486 let mut state = self.state.write();
487 let instance = state
488 .instances
489 .get_mut(&db_instance_identifier)
490 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
491
492 if apply_immediately == Some(false) {
494 let pending = instance
495 .pending_modified_values
496 .get_or_insert(Default::default());
497 if let Some(class) = db_instance_class {
498 pending.db_instance_class = Some(class);
499 }
500 if let Some(deletion_protection) = deletion_protection {
503 instance.deletion_protection = deletion_protection;
504 }
505 if let Some(security_group_ids) = vpc_security_group_ids {
506 instance.vpc_security_group_ids = security_group_ids;
507 }
508 } else {
509 if let Some(class) = db_instance_class {
511 instance.db_instance_class = class;
512 }
513 if let Some(deletion_protection) = deletion_protection {
514 instance.deletion_protection = deletion_protection;
515 }
516 if let Some(security_group_ids) = vpc_security_group_ids {
517 instance.vpc_security_group_ids = security_group_ids;
518 }
519 }
520
521 Ok(AwsResponse::xml(
522 StatusCode::OK,
523 xml_wrap(
524 "ModifyDBInstance",
525 &format!(
526 "<DBInstance>{}</DBInstance>",
527 db_instance_xml(instance, Some("modifying"))
528 ),
529 &request.request_id,
530 ),
531 ))
532 }
533
534 async fn reboot_db_instance(
535 &self,
536 request: &AwsRequest,
537 ) -> Result<AwsResponse, AwsServiceError> {
538 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
539 let force_failover =
540 parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
541 if force_failover == Some(true) {
542 return Err(AwsServiceError::aws_error(
543 StatusCode::BAD_REQUEST,
544 "InvalidParameterCombination",
545 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
546 ));
547 }
548
549 let instance = {
550 let state = self.state.read();
551 state
552 .instances
553 .get(&db_instance_identifier)
554 .cloned()
555 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
556 };
557
558 let runtime = self.require_runtime()?;
559
560 let running = runtime
561 .restart_container(
562 &db_instance_identifier,
563 &instance.engine,
564 &instance.master_username,
565 &instance.master_user_password,
566 instance
567 .db_name
568 .as_deref()
569 .unwrap_or(default_db_name(&instance.engine)),
570 )
571 .await
572 .map_err(runtime_error_to_service_error)?;
573
574 let instance = {
575 let mut state = self.state.write();
576 let instance = state
577 .instances
578 .get_mut(&db_instance_identifier)
579 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
580 instance.host_port = running.host_port;
581 instance.port = i32::from(running.host_port);
582
583 if let Some(pending) = instance.pending_modified_values.take() {
585 if let Some(class) = pending.db_instance_class {
586 instance.db_instance_class = class;
587 }
588 if let Some(allocated_storage) = pending.allocated_storage {
589 instance.allocated_storage = allocated_storage;
590 }
591 if let Some(backup_retention_period) = pending.backup_retention_period {
592 instance.backup_retention_period = backup_retention_period;
593 }
594 if let Some(multi_az) = pending.multi_az {
595 instance.multi_az = multi_az;
596 }
597 if let Some(engine_version) = pending.engine_version {
598 instance.engine_version = engine_version;
599 }
600 if let Some(master_user_password) = pending.master_user_password {
601 instance.master_user_password = master_user_password;
602 }
603 }
604
605 instance.clone()
606 };
607
608 Ok(AwsResponse::xml(
609 StatusCode::OK,
610 xml_wrap(
611 "RebootDBInstance",
612 &format!(
613 "<DBInstance>{}</DBInstance>",
614 db_instance_xml(&instance, Some("rebooting"))
615 ),
616 &request.request_id,
617 ),
618 ))
619 }
620
621 fn describe_db_engine_versions(
622 &self,
623 request: &AwsRequest,
624 ) -> Result<AwsResponse, AwsServiceError> {
625 let engine = optional_param(request, "Engine");
626 let engine_version = optional_param(request, "EngineVersion");
627 let family = optional_param(request, "DBParameterGroupFamily");
628 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
629
630 let mut versions = filter_engine_versions(
631 &default_engine_versions(),
632 &engine,
633 &engine_version,
634 &family,
635 );
636
637 if default_only.unwrap_or(false) {
638 versions.truncate(1);
639 }
640
641 Ok(AwsResponse::xml(
642 StatusCode::OK,
643 xml_wrap(
644 "DescribeDBEngineVersions",
645 &format!(
646 "<DBEngineVersions>{}</DBEngineVersions>",
647 versions.iter().map(engine_version_xml).collect::<String>()
648 ),
649 &request.request_id,
650 ),
651 ))
652 }
653
654 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
655 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
656 let marker = optional_param(request, "Marker");
657 let max_records = optional_param(request, "MaxRecords");
658
659 let state = self.state.read();
660
661 if let Some(identifier) = db_instance_identifier {
663 let instance = state
664 .instances
665 .get(&identifier)
666 .cloned()
667 .ok_or_else(|| db_instance_not_found(&identifier))?;
668
669 return Ok(AwsResponse::xml(
670 StatusCode::OK,
671 xml_wrap(
672 "DescribeDBInstances",
673 &format!(
674 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
675 db_instance_xml(&instance, None)
676 ),
677 &request.request_id,
678 ),
679 ));
680 }
681
682 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
684 instances.sort_by(|a, b| {
685 a.created_at
686 .cmp(&b.created_at)
687 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
688 });
689
690 let paginated = paginate(instances, marker, max_records, |inst| {
692 &inst.db_instance_identifier
693 })?;
694
695 let marker_xml = paginated
696 .next_marker
697 .as_ref()
698 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
699 .unwrap_or_default();
700
701 Ok(AwsResponse::xml(
702 StatusCode::OK,
703 xml_wrap(
704 "DescribeDBInstances",
705 &format!(
706 "<DBInstances>{}</DBInstances>{}",
707 paginated
708 .items
709 .iter()
710 .map(|instance| {
711 format!(
712 "<DBInstance>{}</DBInstance>",
713 db_instance_xml(instance, None)
714 )
715 })
716 .collect::<String>(),
717 marker_xml
718 ),
719 &request.request_id,
720 ),
721 ))
722 }
723
724 fn describe_orderable_db_instance_options(
725 &self,
726 request: &AwsRequest,
727 ) -> Result<AwsResponse, AwsServiceError> {
728 let engine = optional_param(request, "Engine");
729 let engine_version = optional_param(request, "EngineVersion");
730 let db_instance_class = optional_param(request, "DBInstanceClass");
731 let license_model = optional_param(request, "LicenseModel");
732 let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
733
734 let options = filter_orderable_options(
735 &default_orderable_options(),
736 &engine,
737 &engine_version,
738 &db_instance_class,
739 &license_model,
740 vpc,
741 );
742
743 Ok(AwsResponse::xml(
744 StatusCode::OK,
745 xml_wrap(
746 "DescribeOrderableDBInstanceOptions",
747 &format!(
748 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
749 options.iter().map(orderable_option_xml).collect::<String>()
750 ),
751 &request.request_id,
752 ),
753 ))
754 }
755
756 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757 let resource_name = required_param(request, "ResourceName")?;
758 let tags = parse_tags(request)?;
759
760 if tags.is_empty() {
761 return Err(AwsServiceError::aws_error(
762 StatusCode::BAD_REQUEST,
763 "MissingParameter",
764 "The request must contain the parameter Tags.",
765 ));
766 }
767
768 let mut state = self.state.write();
769 let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
770 merge_tags(&mut instance.tags, &tags);
771
772 Ok(AwsResponse::xml(
773 StatusCode::OK,
774 xml_wrap("AddTagsToResource", "", &request.request_id),
775 ))
776 }
777
778 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779 let resource_name = required_param(request, "ResourceName")?;
780 if query_param_prefix_exists(request, "Filters.") {
781 return Err(AwsServiceError::aws_error(
782 StatusCode::BAD_REQUEST,
783 "InvalidParameterValue",
784 "Filters are not yet supported for ListTagsForResource.",
785 ));
786 }
787
788 let state = self.state.read();
789 let instance = find_instance_by_arn(&state, &resource_name)?;
790 let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
791
792 Ok(AwsResponse::xml(
793 StatusCode::OK,
794 xml_wrap(
795 "ListTagsForResource",
796 &format!("<TagList>{tag_xml}</TagList>"),
797 &request.request_id,
798 ),
799 ))
800 }
801
802 fn remove_tags_from_resource(
803 &self,
804 request: &AwsRequest,
805 ) -> Result<AwsResponse, AwsServiceError> {
806 let resource_name = required_param(request, "ResourceName")?;
807 let tag_keys = parse_tag_keys(request)?;
808
809 if tag_keys.is_empty() {
810 return Err(AwsServiceError::aws_error(
811 StatusCode::BAD_REQUEST,
812 "MissingParameter",
813 "The request must contain the parameter TagKeys.",
814 ));
815 }
816
817 let mut state = self.state.write();
818 let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
819 instance
820 .tags
821 .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
822
823 Ok(AwsResponse::xml(
824 StatusCode::OK,
825 xml_wrap("RemoveTagsFromResource", "", &request.request_id),
826 ))
827 }
828
829 async fn create_db_snapshot(
830 &self,
831 request: &AwsRequest,
832 ) -> Result<AwsResponse, AwsServiceError> {
833 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
834 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
835
836 let runtime = self.runtime.as_ref().ok_or_else(|| {
837 AwsServiceError::aws_error(
838 StatusCode::SERVICE_UNAVAILABLE,
839 "InvalidParameterValue",
840 "Docker/Podman is required for RDS snapshots but is not available",
841 )
842 })?;
843
844 let (instance, db_name) = {
845 let state = self.state.write();
846
847 if state.snapshots.contains_key(&db_snapshot_identifier) {
848 return Err(AwsServiceError::aws_error(
849 StatusCode::CONFLICT,
850 "DBSnapshotAlreadyExists",
851 format!("DBSnapshot {db_snapshot_identifier} already exists."),
852 ));
853 }
854
855 let instance = state
856 .instances
857 .get(&db_instance_identifier)
858 .cloned()
859 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
860
861 let default_db = default_db_name(&instance.engine);
862 let db_name = instance
863 .db_name
864 .as_deref()
865 .unwrap_or(default_db)
866 .to_string();
867
868 (instance, db_name)
869 };
870
871 let dump_data = runtime
872 .dump_database(
873 &db_instance_identifier,
874 &instance.engine,
875 &instance.master_username,
876 &instance.master_user_password,
877 &db_name,
878 )
879 .await
880 .map_err(runtime_error_to_service_error)?;
881
882 let mut state = self.state.write();
883
884 if state.snapshots.contains_key(&db_snapshot_identifier) {
885 return Err(AwsServiceError::aws_error(
886 StatusCode::CONFLICT,
887 "DBSnapshotAlreadyExists",
888 format!("DBSnapshot {db_snapshot_identifier} already exists."),
889 ));
890 }
891
892 let snapshot = DbSnapshot {
893 db_snapshot_identifier: db_snapshot_identifier.clone(),
894 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
895 db_instance_identifier: instance.db_instance_identifier.clone(),
896 snapshot_create_time: Utc::now(),
897 engine: instance.engine.clone(),
898 engine_version: instance.engine_version.clone(),
899 allocated_storage: instance.allocated_storage,
900 status: "available".to_string(),
901 port: instance.port,
902 master_username: instance.master_username.clone(),
903 db_name: instance.db_name.clone(),
904 dbi_resource_id: instance.dbi_resource_id.clone(),
905 snapshot_type: "manual".to_string(),
906 master_user_password: instance.master_user_password.clone(),
907 tags: Vec::new(),
908 dump_data,
909 };
910
911 state
912 .snapshots
913 .insert(db_snapshot_identifier, snapshot.clone());
914
915 Ok(AwsResponse::xml(
916 StatusCode::OK,
917 xml_wrap(
918 "CreateDBSnapshot",
919 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
920 &request.request_id,
921 ),
922 ))
923 }
924
925 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
926 let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
927 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
928 let marker = optional_param(request, "Marker");
929 let max_records = optional_param(request, "MaxRecords");
930
931 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
932 return Err(AwsServiceError::aws_error(
933 StatusCode::BAD_REQUEST,
934 "InvalidParameterCombination",
935 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
936 ));
937 }
938
939 let state = self.state.read();
940
941 if let Some(snapshot_id) = db_snapshot_identifier {
943 let snapshot = state
944 .snapshots
945 .get(&snapshot_id)
946 .cloned()
947 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
948
949 return Ok(AwsResponse::xml(
950 StatusCode::OK,
951 xml_wrap(
952 "DescribeDBSnapshots",
953 &format!(
954 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
955 db_snapshot_xml(&snapshot)
956 ),
957 &request.request_id,
958 ),
959 ));
960 }
961
962 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
964 state
965 .snapshots
966 .values()
967 .filter(|s| s.db_instance_identifier == instance_id)
968 .cloned()
969 .collect()
970 } else {
971 state.snapshots.values().cloned().collect()
972 };
973
974 snapshots.sort_by(|a, b| {
976 a.snapshot_create_time
977 .cmp(&b.snapshot_create_time)
978 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
979 });
980
981 let paginated = paginate(snapshots, marker, max_records, |snap| {
983 &snap.db_snapshot_identifier
984 })?;
985
986 let marker_xml = paginated
987 .next_marker
988 .as_ref()
989 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
990 .unwrap_or_default();
991
992 Ok(AwsResponse::xml(
993 StatusCode::OK,
994 xml_wrap(
995 "DescribeDBSnapshots",
996 &format!(
997 "<DBSnapshots>{}</DBSnapshots>{}",
998 paginated
999 .items
1000 .iter()
1001 .map(|snapshot| format!(
1002 "<DBSnapshot>{}</DBSnapshot>",
1003 db_snapshot_xml(snapshot)
1004 ))
1005 .collect::<String>(),
1006 marker_xml
1007 ),
1008 &request.request_id,
1009 ),
1010 ))
1011 }
1012
1013 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1014 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1015
1016 let mut state = self.state.write();
1017
1018 let snapshot = state
1019 .snapshots
1020 .remove(&db_snapshot_identifier)
1021 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1022
1023 Ok(AwsResponse::xml(
1024 StatusCode::OK,
1025 xml_wrap(
1026 "DeleteDBSnapshot",
1027 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1028 &request.request_id,
1029 ),
1030 ))
1031 }
1032
1033 async fn restore_db_instance_from_db_snapshot(
1034 &self,
1035 request: &AwsRequest,
1036 ) -> Result<AwsResponse, AwsServiceError> {
1037 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1038 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1039 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1040
1041 let runtime = self.require_runtime()?;
1042
1043 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1044 let mut state = self.state.write();
1045
1046 if !state.begin_instance_creation(&db_instance_identifier) {
1047 return Err(AwsServiceError::aws_error(
1048 StatusCode::CONFLICT,
1049 "DBInstanceAlreadyExists",
1050 format!("DBInstance {db_instance_identifier} already exists."),
1051 ));
1052 }
1053
1054 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1055 Some(s) => s,
1056 None => {
1057 state.cancel_instance_creation(&db_instance_identifier);
1058 return Err(db_snapshot_not_found(&db_snapshot_identifier));
1059 }
1060 };
1061
1062 let dbi_resource_id = state.next_dbi_resource_id();
1063 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1064 let created_at = Utc::now();
1065
1066 (snapshot, dbi_resource_id, db_instance_arn, created_at)
1067 };
1068
1069 let db_name = snapshot
1070 .db_name
1071 .as_deref()
1072 .unwrap_or(default_db_name(&snapshot.engine));
1073 let running = match runtime
1074 .ensure_postgres(
1075 &db_instance_identifier,
1076 &snapshot.engine,
1077 &snapshot.engine_version,
1078 &snapshot.master_username,
1079 &snapshot.master_user_password,
1080 db_name,
1081 )
1082 .await
1083 {
1084 Ok(running) => running,
1085 Err(e) => {
1086 self.state
1087 .write()
1088 .cancel_instance_creation(&db_instance_identifier);
1089 return Err(runtime_error_to_service_error(e));
1090 }
1091 };
1092
1093 if let Err(e) = runtime
1094 .restore_database(
1095 &db_instance_identifier,
1096 &snapshot.engine,
1097 &snapshot.master_username,
1098 &snapshot.master_user_password,
1099 db_name,
1100 &snapshot.dump_data,
1101 )
1102 .await
1103 {
1104 self.state
1105 .write()
1106 .cancel_instance_creation(&db_instance_identifier);
1107 runtime.stop_container(&db_instance_identifier).await;
1108 return Err(runtime_error_to_service_error(e));
1109 }
1110
1111 let instance = build_restored_instance(
1112 &db_instance_identifier,
1113 db_instance_arn,
1114 dbi_resource_id,
1115 created_at,
1116 vpc_security_group_ids,
1117 &snapshot,
1118 &running,
1119 );
1120
1121 self.state
1122 .write()
1123 .finish_instance_creation(instance.clone());
1124
1125 Ok(AwsResponse::xml(
1126 StatusCode::OK,
1127 xml_wrap(
1128 "RestoreDBInstanceFromDBSnapshot",
1129 &format!(
1130 "<DBInstance>{}</DBInstance>",
1131 db_instance_xml(&instance, None)
1132 ),
1133 &request.request_id,
1134 ),
1135 ))
1136 }
1137
1138 async fn create_db_instance_read_replica(
1139 &self,
1140 request: &AwsRequest,
1141 ) -> Result<AwsResponse, AwsServiceError> {
1142 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1143 let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1144
1145 let runtime = self.runtime.as_ref().ok_or_else(|| {
1146 AwsServiceError::aws_error(
1147 StatusCode::SERVICE_UNAVAILABLE,
1148 "InvalidParameterValue",
1149 "Docker/Podman is required for RDS read replicas but is not available",
1150 )
1151 })?;
1152
1153 let (source_instance, db_name) = {
1154 let mut state = self.state.write();
1155
1156 if !state.begin_instance_creation(&db_instance_identifier) {
1157 return Err(AwsServiceError::aws_error(
1158 StatusCode::CONFLICT,
1159 "DBInstanceAlreadyExists",
1160 format!("DBInstance {db_instance_identifier} already exists."),
1161 ));
1162 }
1163
1164 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1165 {
1166 Some(inst) => inst,
1167 None => {
1168 state.cancel_instance_creation(&db_instance_identifier);
1169 return Err(db_instance_not_found(&source_db_instance_identifier));
1170 }
1171 };
1172
1173 let default_db = default_db_name(&source_instance.engine);
1174 let db_name = source_instance
1175 .db_name
1176 .as_deref()
1177 .unwrap_or(default_db)
1178 .to_string();
1179
1180 (source_instance, db_name)
1181 };
1182
1183 let dump_data = match runtime
1184 .dump_database(
1185 &source_db_instance_identifier,
1186 &source_instance.engine,
1187 &source_instance.master_username,
1188 &source_instance.master_user_password,
1189 &db_name,
1190 )
1191 .await
1192 {
1193 Ok(data) => data,
1194 Err(e) => {
1195 self.state
1196 .write()
1197 .cancel_instance_creation(&db_instance_identifier);
1198 return Err(runtime_error_to_service_error(e));
1199 }
1200 };
1201
1202 let dbi_resource_id = self.state.read().next_dbi_resource_id();
1203 let db_instance_arn = self.state.read().db_instance_arn(&db_instance_identifier);
1204 let created_at = Utc::now();
1205
1206 let running = match runtime
1207 .ensure_postgres(
1208 &db_instance_identifier,
1209 &source_instance.engine,
1210 &source_instance.engine_version,
1211 &source_instance.master_username,
1212 &source_instance.master_user_password,
1213 &db_name,
1214 )
1215 .await
1216 {
1217 Ok(running) => running,
1218 Err(e) => {
1219 self.state
1220 .write()
1221 .cancel_instance_creation(&db_instance_identifier);
1222 return Err(runtime_error_to_service_error(e));
1223 }
1224 };
1225
1226 if let Err(e) = runtime
1227 .restore_database(
1228 &db_instance_identifier,
1229 &source_instance.engine,
1230 &source_instance.master_username,
1231 &source_instance.master_user_password,
1232 &db_name,
1233 &dump_data,
1234 )
1235 .await
1236 {
1237 self.state
1238 .write()
1239 .cancel_instance_creation(&db_instance_identifier);
1240 runtime.stop_container(&db_instance_identifier).await;
1241 return Err(runtime_error_to_service_error(e));
1242 }
1243
1244 let replica = build_read_replica_instance(
1245 &db_instance_identifier,
1246 db_instance_arn,
1247 dbi_resource_id,
1248 created_at,
1249 &source_db_instance_identifier,
1250 &source_instance,
1251 &running,
1252 );
1253
1254 let source_missing = {
1255 let mut state = self.state.write();
1256 match state.instances.get_mut(&source_db_instance_identifier) {
1257 Some(source) => {
1258 source
1259 .read_replica_db_instance_identifiers
1260 .push(db_instance_identifier.clone());
1261 state.finish_instance_creation(replica.clone());
1262 false
1263 }
1264 None => {
1265 state.cancel_instance_creation(&db_instance_identifier);
1266 true
1267 }
1268 }
1269 };
1270
1271 if source_missing {
1272 runtime.stop_container(&db_instance_identifier).await;
1273 return Err(db_instance_not_found(&source_db_instance_identifier));
1274 }
1275
1276 Ok(AwsResponse::xml(
1277 StatusCode::OK,
1278 xml_wrap(
1279 "CreateDBInstanceReadReplica",
1280 &format!(
1281 "<DBInstance>{}</DBInstance>",
1282 db_instance_xml(&replica, None)
1283 ),
1284 &request.request_id,
1285 ),
1286 ))
1287 }
1288
1289 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1290 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1291 let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1292 let subnet_ids = parse_subnet_ids(request)?;
1293
1294 if subnet_ids.is_empty() {
1295 return Err(AwsServiceError::aws_error(
1296 StatusCode::BAD_REQUEST,
1297 "InvalidParameterValue",
1298 "At least one subnet must be specified.",
1299 ));
1300 }
1301
1302 if subnet_ids.len() < 2 {
1303 return Err(AwsServiceError::aws_error(
1304 StatusCode::BAD_REQUEST,
1305 "DBSubnetGroupDoesNotCoverEnoughAZs",
1306 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1307 ));
1308 }
1309
1310 let mut state = self.state.write();
1311
1312 if state.subnet_groups.contains_key(&db_subnet_group_name) {
1313 return Err(AwsServiceError::aws_error(
1314 StatusCode::CONFLICT,
1315 "DBSubnetGroupAlreadyExists",
1316 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1317 ));
1318 }
1319
1320 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1321 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1322 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1323 .collect();
1324
1325 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1327 if unique_azs.len() < 2 {
1328 return Err(AwsServiceError::aws_error(
1329 StatusCode::BAD_REQUEST,
1330 "DBSubnetGroupDoesNotCoverEnoughAZs",
1331 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1332 ));
1333 }
1334
1335 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1336 let tags = parse_tags(request)?;
1337
1338 let subnet_group = DbSubnetGroup {
1339 db_subnet_group_name: db_subnet_group_name.clone(),
1340 db_subnet_group_arn,
1341 db_subnet_group_description,
1342 vpc_id,
1343 subnet_ids,
1344 subnet_availability_zones,
1345 tags,
1346 };
1347
1348 state
1349 .subnet_groups
1350 .insert(db_subnet_group_name, subnet_group.clone());
1351
1352 Ok(AwsResponse::xml(
1353 StatusCode::OK,
1354 xml_wrap(
1355 "CreateDBSubnetGroup",
1356 &format!(
1357 "<DBSubnetGroup>{}</DBSubnetGroup>",
1358 db_subnet_group_xml(&subnet_group)
1359 ),
1360 &request.request_id,
1361 ),
1362 ))
1363 }
1364
1365 fn describe_db_subnet_groups(
1366 &self,
1367 request: &AwsRequest,
1368 ) -> Result<AwsResponse, AwsServiceError> {
1369 let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1370 let marker = optional_param(request, "Marker");
1371 let max_records = optional_param(request, "MaxRecords");
1372
1373 let state = self.state.read();
1374
1375 if let Some(name) = db_subnet_group_name {
1377 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1378 AwsServiceError::aws_error(
1379 StatusCode::NOT_FOUND,
1380 "DBSubnetGroupNotFoundFault",
1381 format!("DBSubnetGroup {} not found.", name),
1382 )
1383 })?;
1384
1385 return Ok(AwsResponse::xml(
1386 StatusCode::OK,
1387 xml_wrap(
1388 "DescribeDBSubnetGroups",
1389 &format!(
1390 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1391 db_subnet_group_xml(sg)
1392 ),
1393 &request.request_id,
1394 ),
1395 ));
1396 }
1397
1398 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1400 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1401
1402 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1404 &sg.db_subnet_group_name
1405 })?;
1406
1407 let marker_xml = paginated
1408 .next_marker
1409 .as_ref()
1410 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1411 .unwrap_or_default();
1412
1413 let body = paginated
1414 .items
1415 .iter()
1416 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1417 .collect::<Vec<_>>()
1418 .join("");
1419
1420 Ok(AwsResponse::xml(
1421 StatusCode::OK,
1422 xml_wrap(
1423 "DescribeDBSubnetGroups",
1424 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1425 &request.request_id,
1426 ),
1427 ))
1428 }
1429
1430 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1431 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1432
1433 let mut state = self.state.write();
1434
1435 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1436 return Err(AwsServiceError::aws_error(
1437 StatusCode::NOT_FOUND,
1438 "DBSubnetGroupNotFoundFault",
1439 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1440 ));
1441 }
1442
1443 Ok(AwsResponse::xml(
1444 StatusCode::OK,
1445 xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1446 ))
1447 }
1448
1449 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1450 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1451 let subnet_ids = parse_subnet_ids(request)?;
1452
1453 if subnet_ids.is_empty() {
1454 return Err(AwsServiceError::aws_error(
1455 StatusCode::BAD_REQUEST,
1456 "InvalidParameterValue",
1457 "At least one subnet must be specified.",
1458 ));
1459 }
1460
1461 if subnet_ids.len() < 2 {
1462 return Err(AwsServiceError::aws_error(
1463 StatusCode::BAD_REQUEST,
1464 "DBSubnetGroupDoesNotCoverEnoughAZs",
1465 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1466 ));
1467 }
1468
1469 let mut state = self.state.write();
1470
1471 let region = state.region.clone();
1472
1473 let subnet_group = state
1474 .subnet_groups
1475 .get_mut(&db_subnet_group_name)
1476 .ok_or_else(|| {
1477 AwsServiceError::aws_error(
1478 StatusCode::NOT_FOUND,
1479 "DBSubnetGroupNotFoundFault",
1480 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1481 )
1482 })?;
1483
1484 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1485 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
1486 .collect();
1487
1488 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1490 if unique_azs.len() < 2 {
1491 return Err(AwsServiceError::aws_error(
1492 StatusCode::BAD_REQUEST,
1493 "DBSubnetGroupDoesNotCoverEnoughAZs",
1494 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1495 ));
1496 }
1497
1498 subnet_group.subnet_ids = subnet_ids;
1499 subnet_group.subnet_availability_zones = subnet_availability_zones;
1500
1501 let subnet_group_clone = subnet_group.clone();
1502
1503 Ok(AwsResponse::xml(
1504 StatusCode::OK,
1505 xml_wrap(
1506 "ModifyDBSubnetGroup",
1507 &format!(
1508 "<DBSubnetGroup>{}</DBSubnetGroup>",
1509 db_subnet_group_xml(&subnet_group_clone)
1510 ),
1511 &request.request_id,
1512 ),
1513 ))
1514 }
1515
1516 fn create_db_parameter_group(
1517 &self,
1518 request: &AwsRequest,
1519 ) -> Result<AwsResponse, AwsServiceError> {
1520 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1521 let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1522 let description = required_param(request, "Description")?;
1523
1524 let valid_families = [
1526 "postgres16",
1527 "postgres15",
1528 "postgres14",
1529 "postgres13",
1530 "mysql8.0",
1531 "mysql5.7",
1532 "mariadb10.11",
1533 "mariadb10.6",
1534 ];
1535
1536 if !valid_families.contains(&db_parameter_group_family.as_str()) {
1537 return Err(AwsServiceError::aws_error(
1538 StatusCode::BAD_REQUEST,
1539 "InvalidParameterValue",
1540 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1541 ));
1542 }
1543
1544 let mut state = self.state.write();
1545
1546 if state
1547 .parameter_groups
1548 .contains_key(&db_parameter_group_name)
1549 {
1550 return Err(AwsServiceError::aws_error(
1551 StatusCode::CONFLICT,
1552 "DBParameterGroupAlreadyExists",
1553 format!("DBParameterGroup {db_parameter_group_name} already exists."),
1554 ));
1555 }
1556
1557 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1558 let tags = parse_tags(request)?;
1559
1560 let parameter_group = DbParameterGroup {
1561 db_parameter_group_name: db_parameter_group_name.clone(),
1562 db_parameter_group_arn,
1563 db_parameter_group_family,
1564 description,
1565 parameters: std::collections::HashMap::new(),
1566 tags,
1567 };
1568
1569 state
1570 .parameter_groups
1571 .insert(db_parameter_group_name, parameter_group.clone());
1572
1573 Ok(AwsResponse::xml(
1574 StatusCode::OK,
1575 xml_wrap(
1576 "CreateDBParameterGroup",
1577 &format!(
1578 "<DBParameterGroup>{}</DBParameterGroup>",
1579 db_parameter_group_xml(¶meter_group)
1580 ),
1581 &request.request_id,
1582 ),
1583 ))
1584 }
1585
1586 fn describe_db_parameter_groups(
1587 &self,
1588 request: &AwsRequest,
1589 ) -> Result<AwsResponse, AwsServiceError> {
1590 let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1591 let marker = optional_param(request, "Marker");
1592 let max_records = optional_param(request, "MaxRecords");
1593
1594 let state = self.state.read();
1595
1596 if let Some(name) = db_parameter_group_name {
1598 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1599 AwsServiceError::aws_error(
1600 StatusCode::NOT_FOUND,
1601 "DBParameterGroupNotFound",
1602 format!("DBParameterGroup {} not found.", name),
1603 )
1604 })?;
1605
1606 return Ok(AwsResponse::xml(
1607 StatusCode::OK,
1608 xml_wrap(
1609 "DescribeDBParameterGroups",
1610 &format!(
1611 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1612 db_parameter_group_xml(pg)
1613 ),
1614 &request.request_id,
1615 ),
1616 ));
1617 }
1618
1619 let mut parameter_groups: Vec<DbParameterGroup> =
1621 state.parameter_groups.values().cloned().collect();
1622 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1623
1624 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1626 &pg.db_parameter_group_name
1627 })?;
1628
1629 let marker_xml = paginated
1630 .next_marker
1631 .as_ref()
1632 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1633 .unwrap_or_default();
1634
1635 let body = paginated
1636 .items
1637 .iter()
1638 .map(|pg| {
1639 format!(
1640 "<DBParameterGroup>{}</DBParameterGroup>",
1641 db_parameter_group_xml(pg)
1642 )
1643 })
1644 .collect::<Vec<_>>()
1645 .join("");
1646
1647 Ok(AwsResponse::xml(
1648 StatusCode::OK,
1649 xml_wrap(
1650 "DescribeDBParameterGroups",
1651 &format!(
1652 "<DBParameterGroups>{}</DBParameterGroups>{}",
1653 body, marker_xml
1654 ),
1655 &request.request_id,
1656 ),
1657 ))
1658 }
1659
1660 fn delete_db_parameter_group(
1661 &self,
1662 request: &AwsRequest,
1663 ) -> Result<AwsResponse, AwsServiceError> {
1664 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1665
1666 let mut state = self.state.write();
1667
1668 if db_parameter_group_name.starts_with("default.") {
1669 return Err(AwsServiceError::aws_error(
1670 StatusCode::BAD_REQUEST,
1671 "InvalidParameterValue",
1672 "Cannot delete default parameter groups.",
1673 ));
1674 }
1675
1676 if state
1677 .parameter_groups
1678 .remove(&db_parameter_group_name)
1679 .is_none()
1680 {
1681 return Err(AwsServiceError::aws_error(
1682 StatusCode::NOT_FOUND,
1683 "DBParameterGroupNotFound",
1684 format!("DBParameterGroup {db_parameter_group_name} not found."),
1685 ));
1686 }
1687
1688 Ok(AwsResponse::xml(
1689 StatusCode::OK,
1690 xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1691 ))
1692 }
1693
1694 fn modify_db_parameter_group(
1695 &self,
1696 request: &AwsRequest,
1697 ) -> Result<AwsResponse, AwsServiceError> {
1698 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1699
1700 let mut state = self.state.write();
1701
1702 let parameter_group = state
1703 .parameter_groups
1704 .get_mut(&db_parameter_group_name)
1705 .ok_or_else(|| {
1706 AwsServiceError::aws_error(
1707 StatusCode::NOT_FOUND,
1708 "DBParameterGroupNotFound",
1709 format!("DBParameterGroup {db_parameter_group_name} not found."),
1710 )
1711 })?;
1712
1713 if let Some(new_description) = optional_param(request, "Description") {
1714 parameter_group.description = new_description;
1715 }
1716
1717 let parameter_group_clone = parameter_group.clone();
1718
1719 Ok(AwsResponse::xml(
1720 StatusCode::OK,
1721 xml_wrap(
1722 "ModifyDBParameterGroup",
1723 &format!(
1724 "<DBParameterGroupName>{}</DBParameterGroupName>",
1725 xml_escape(¶meter_group_clone.db_parameter_group_name)
1726 ),
1727 &request.request_id,
1728 ),
1729 ))
1730 }
1731}
1732
1733fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1734 fakecloud_core::query::optional_query_param(req, name)
1735}
1736
1737fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1738 fakecloud_core::query::required_query_param(req, name)
1739}
1740
1741fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1742 let value = required_param(req, name)?;
1743 value.parse::<i32>().map_err(|_| {
1744 AwsServiceError::aws_error(
1745 StatusCode::BAD_REQUEST,
1746 "InvalidParameterValue",
1747 format!("Parameter {name} must be a valid integer."),
1748 )
1749 })
1750}
1751
1752fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1753 optional_param(req, name)
1754 .map(|value| {
1755 value.parse::<i32>().map_err(|_| {
1756 AwsServiceError::aws_error(
1757 StatusCode::BAD_REQUEST,
1758 "InvalidParameterValue",
1759 format!("Parameter {name} must be a valid integer."),
1760 )
1761 })
1762 })
1763 .transpose()
1764}
1765
1766fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1767 let mut tags = Vec::new();
1768 for index in 1.. {
1769 let key_name = format!("Tags.Tag.{index}.Key");
1770 let value_name = format!("Tags.Tag.{index}.Value");
1771 let key = optional_param(req, &key_name);
1772 let value = optional_param(req, &value_name);
1773
1774 match (key, value) {
1775 (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1776 (None, None) => break,
1777 _ => {
1778 return Err(AwsServiceError::aws_error(
1779 StatusCode::BAD_REQUEST,
1780 "InvalidParameterValue",
1781 "Each tag must include both Key and Value.",
1782 ));
1783 }
1784 }
1785 }
1786
1787 Ok(tags)
1788}
1789
1790fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1791 let mut keys = Vec::new();
1792 for index in 1.. {
1793 let key_name = format!("TagKeys.member.{index}");
1794 match optional_param(req, &key_name) {
1795 Some(key) => keys.push(key),
1796 None => break,
1797 }
1798 }
1799
1800 Ok(keys)
1801}
1802
1803fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1804 let mut subnet_ids = Vec::new();
1805 for index in 1.. {
1806 let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1807 match optional_param(req, &subnet_id_name) {
1808 Some(subnet_id) => subnet_ids.push(subnet_id),
1809 None => break,
1810 }
1811 }
1812
1813 Ok(subnet_ids)
1814}
1815
1816fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1817 let mut security_group_ids = Vec::new();
1818 for index in 1.. {
1819 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1820 match optional_param(req, &sg_id_name) {
1821 Some(sg_id) => security_group_ids.push(sg_id),
1822 None => break,
1823 }
1824 }
1825
1826 if security_group_ids.is_empty() {
1828 security_group_ids.push("sg-default".to_string());
1829 }
1830
1831 security_group_ids
1832}
1833
1834fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1835 req.query_params.keys().any(|key| key.starts_with(prefix))
1836}
1837
1838fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1839 value
1840 .map(|raw| match raw {
1841 "true" | "True" | "TRUE" => Ok(true),
1842 "false" | "False" | "FALSE" => Ok(false),
1843 _ => Err(AwsServiceError::aws_error(
1844 StatusCode::BAD_REQUEST,
1845 "InvalidParameterValue",
1846 format!("Boolean parameter value '{raw}' is invalid."),
1847 )),
1848 })
1849 .transpose()
1850}
1851
1852struct PaginationResult<T> {
1853 items: Vec<T>,
1854 next_marker: Option<String>,
1855}
1856
1857fn paginate<T, F>(
1858 mut items: Vec<T>,
1859 marker: Option<String>,
1860 max_records: Option<String>,
1861 get_id: F,
1862) -> Result<PaginationResult<T>, AwsServiceError>
1863where
1864 F: Fn(&T) -> &str,
1865{
1866 let max = if let Some(max_str) = max_records {
1868 let parsed = max_str.parse::<i32>().map_err(|_| {
1869 AwsServiceError::aws_error(
1870 StatusCode::BAD_REQUEST,
1871 "InvalidParameterValue",
1872 "MaxRecords must be a valid integer.",
1873 )
1874 })?;
1875 if !(1..=100).contains(&parsed) {
1876 return Err(AwsServiceError::aws_error(
1877 StatusCode::BAD_REQUEST,
1878 "InvalidParameterValue",
1879 "MaxRecords must be between 1 and 100.",
1880 ));
1881 }
1882 parsed as usize
1883 } else {
1884 100
1885 };
1886
1887 let start_id = if let Some(encoded_marker) = marker {
1889 let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
1890 AwsServiceError::aws_error(
1891 StatusCode::BAD_REQUEST,
1892 "InvalidParameterValue",
1893 "Marker is invalid.",
1894 )
1895 })?;
1896 let id = String::from_utf8(decoded).map_err(|_| {
1897 AwsServiceError::aws_error(
1898 StatusCode::BAD_REQUEST,
1899 "InvalidParameterValue",
1900 "Marker is invalid.",
1901 )
1902 })?;
1903 Some(id)
1904 } else {
1905 None
1906 };
1907
1908 let start_index = if let Some(ref start_id) = start_id {
1910 items
1911 .iter()
1912 .position(|item| get_id(item) == start_id)
1913 .map(|pos| pos + 1) .unwrap_or(items.len()) } else {
1916 0
1917 };
1918
1919 let total_items = items.len();
1921 let end_index = std::cmp::min(start_index + max, total_items);
1922 let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
1923
1924 let next_marker = if end_index < total_items {
1926 paginated_items
1927 .last()
1928 .map(|item| BASE64.encode(get_id(item).as_bytes()))
1929 } else {
1930 None
1931 };
1932
1933 Ok(PaginationResult {
1934 items: paginated_items,
1935 next_marker,
1936 })
1937}
1938
1939fn validate_create_request(
1940 db_instance_identifier: &str,
1941 allocated_storage: i32,
1942 db_instance_class: &str,
1943 engine: &str,
1944 engine_version: &str,
1945 port: i32,
1946) -> Result<(), AwsServiceError> {
1947 if allocated_storage <= 0 {
1948 return Err(AwsServiceError::aws_error(
1949 StatusCode::BAD_REQUEST,
1950 "InvalidParameterValue",
1951 "AllocatedStorage must be greater than zero.",
1952 ));
1953 }
1954 if port <= 0 {
1955 return Err(AwsServiceError::aws_error(
1956 StatusCode::BAD_REQUEST,
1957 "InvalidParameterValue",
1958 "Port must be greater than zero.",
1959 ));
1960 }
1961 if !db_instance_identifier
1962 .chars()
1963 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
1964 {
1965 return Err(AwsServiceError::aws_error(
1966 StatusCode::BAD_REQUEST,
1967 "InvalidParameterValue",
1968 "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
1969 ));
1970 }
1971 let supported_engines = ["postgres", "mysql", "mariadb"];
1973 if !supported_engines.contains(&engine) {
1974 return Err(AwsServiceError::aws_error(
1975 StatusCode::BAD_REQUEST,
1976 "InvalidParameterValue",
1977 format!("Engine '{}' is not supported.", engine),
1978 ));
1979 }
1980
1981 let supported_versions = match engine {
1983 "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
1984 "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
1985 "mariadb" => vec!["10.11.6", "10.6.16"],
1986 _ => vec![],
1987 };
1988
1989 if !supported_versions.contains(&engine_version) {
1990 return Err(AwsServiceError::aws_error(
1991 StatusCode::BAD_REQUEST,
1992 "InvalidParameterValue",
1993 format!("EngineVersion '{engine_version}' is not supported yet."),
1994 ));
1995 }
1996 validate_db_instance_class(db_instance_class)?;
1997 Ok(())
1998}
1999
2000fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2001 if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2002 return Err(AwsServiceError::aws_error(
2003 StatusCode::BAD_REQUEST,
2004 "InvalidParameterValue",
2005 format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2006 ));
2007 }
2008 Ok(())
2009}
2010
2011fn filter_engine_versions(
2012 versions: &[EngineVersionInfo],
2013 engine: &Option<String>,
2014 engine_version: &Option<String>,
2015 family: &Option<String>,
2016) -> Vec<EngineVersionInfo> {
2017 versions
2018 .iter()
2019 .filter(|candidate| {
2020 engine
2021 .as_ref()
2022 .is_none_or(|expected| candidate.engine == *expected)
2023 })
2024 .filter(|candidate| {
2025 engine_version
2026 .as_ref()
2027 .is_none_or(|expected| candidate.engine_version == *expected)
2028 })
2029 .filter(|candidate| {
2030 family
2031 .as_ref()
2032 .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2033 })
2034 .cloned()
2035 .collect()
2036}
2037
2038fn filter_orderable_options(
2039 options: &[OrderableDbInstanceOption],
2040 engine: &Option<String>,
2041 engine_version: &Option<String>,
2042 db_instance_class: &Option<String>,
2043 license_model: &Option<String>,
2044 vpc: Option<bool>,
2045) -> Vec<OrderableDbInstanceOption> {
2046 options
2047 .iter()
2048 .filter(|candidate| {
2049 engine
2050 .as_ref()
2051 .is_none_or(|expected| candidate.engine == *expected)
2052 })
2053 .filter(|candidate| {
2054 engine_version
2055 .as_ref()
2056 .is_none_or(|expected| candidate.engine_version == *expected)
2057 })
2058 .filter(|candidate| {
2059 db_instance_class
2060 .as_ref()
2061 .is_none_or(|expected| candidate.db_instance_class == *expected)
2062 })
2063 .filter(|candidate| {
2064 license_model
2065 .as_ref()
2066 .is_none_or(|expected| candidate.license_model == *expected)
2067 })
2068 .filter(|_| vpc.unwrap_or(true))
2069 .cloned()
2070 .collect()
2071}
2072
2073#[allow(clippy::too_many_arguments)]
2077fn build_restored_instance(
2081 db_instance_identifier: &str,
2082 db_instance_arn: String,
2083 dbi_resource_id: String,
2084 created_at: chrono::DateTime<Utc>,
2085 vpc_security_group_ids: Vec<String>,
2086 snapshot: &DbSnapshot,
2087 running: &crate::runtime::RunningDbContainer,
2088) -> DbInstance {
2089 DbInstance {
2090 db_instance_identifier: db_instance_identifier.to_string(),
2091 db_instance_arn,
2092 db_instance_class: "db.t3.micro".to_string(),
2093 engine: snapshot.engine.clone(),
2094 engine_version: snapshot.engine_version.clone(),
2095 db_instance_status: "available".to_string(),
2096 master_username: snapshot.master_username.clone(),
2097 db_name: snapshot.db_name.clone(),
2098 endpoint_address: "127.0.0.1".to_string(),
2099 port: i32::from(running.host_port),
2100 allocated_storage: snapshot.allocated_storage,
2101 publicly_accessible: true,
2102 deletion_protection: false,
2103 created_at,
2104 dbi_resource_id,
2105 master_user_password: snapshot.master_user_password.clone(),
2106 container_id: running.container_id.clone(),
2107 host_port: running.host_port,
2108 tags: Vec::new(),
2109 read_replica_source_db_instance_identifier: None,
2110 read_replica_db_instance_identifiers: Vec::new(),
2111 vpc_security_group_ids,
2112 db_parameter_group_name: None,
2113 backup_retention_period: 1,
2114 preferred_backup_window: "03:00-04:00".to_string(),
2115 latest_restorable_time: Some(created_at),
2116 option_group_name: None,
2117 multi_az: false,
2118 pending_modified_values: None,
2119 }
2120}
2121
2122fn build_read_replica_instance(
2123 db_instance_identifier: &str,
2124 db_instance_arn: String,
2125 dbi_resource_id: String,
2126 created_at: chrono::DateTime<Utc>,
2127 source_db_instance_identifier: &str,
2128 source: &DbInstance,
2129 running: &crate::runtime::RunningDbContainer,
2130) -> DbInstance {
2131 DbInstance {
2132 db_instance_identifier: db_instance_identifier.to_string(),
2133 db_instance_arn,
2134 db_instance_class: source.db_instance_class.clone(),
2135 engine: source.engine.clone(),
2136 engine_version: source.engine_version.clone(),
2137 db_instance_status: "available".to_string(),
2138 master_username: source.master_username.clone(),
2139 db_name: source.db_name.clone(),
2140 endpoint_address: "127.0.0.1".to_string(),
2141 port: i32::from(running.host_port),
2142 allocated_storage: source.allocated_storage,
2143 publicly_accessible: source.publicly_accessible,
2144 deletion_protection: false,
2145 created_at,
2146 dbi_resource_id,
2147 master_user_password: source.master_user_password.clone(),
2148 container_id: running.container_id.clone(),
2149 host_port: running.host_port,
2150 tags: Vec::new(),
2151 read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2152 read_replica_db_instance_identifiers: Vec::new(),
2153 vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2154 db_parameter_group_name: source.db_parameter_group_name.clone(),
2155 backup_retention_period: source.backup_retention_period,
2156 preferred_backup_window: source.preferred_backup_window.clone(),
2157 latest_restorable_time: if source.backup_retention_period > 0 {
2158 Some(created_at)
2159 } else {
2160 None
2161 },
2162 option_group_name: source.option_group_name.clone(),
2163 multi_az: source.multi_az,
2164 pending_modified_values: None,
2165 }
2166}
2167
2168fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2169 fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2170}
2171
2172fn engine_version_xml(version: &EngineVersionInfo) -> String {
2173 format!(
2174 "<DBEngineVersion>\
2175 <Engine>{}</Engine>\
2176 <EngineVersion>{}</EngineVersion>\
2177 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2178 <DBEngineDescription>{}</DBEngineDescription>\
2179 <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2180 <Status>{}</Status>\
2181 </DBEngineVersion>",
2182 xml_escape(&version.engine),
2183 xml_escape(&version.engine_version),
2184 xml_escape(&version.db_parameter_group_family),
2185 xml_escape(&version.db_engine_description),
2186 xml_escape(&version.db_engine_version_description),
2187 xml_escape(&version.status),
2188 )
2189}
2190
2191fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2192 format!(
2193 "<OrderableDBInstanceOption>\
2194 <Engine>{}</Engine>\
2195 <EngineVersion>{}</EngineVersion>\
2196 <DBInstanceClass>{}</DBInstanceClass>\
2197 <LicenseModel>{}</LicenseModel>\
2198 <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2199 <MultiAZCapable>true</MultiAZCapable>\
2200 <ReadReplicaCapable>true</ReadReplicaCapable>\
2201 <Vpc>true</Vpc>\
2202 <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2203 <StorageType>{}</StorageType>\
2204 <SupportsIops>false</SupportsIops>\
2205 <MinStorageSize>{}</MinStorageSize>\
2206 <MaxStorageSize>{}</MaxStorageSize>\
2207 <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2208 </OrderableDBInstanceOption>",
2209 xml_escape(&option.engine),
2210 xml_escape(&option.engine_version),
2211 xml_escape(&option.db_instance_class),
2212 xml_escape(&option.license_model),
2213 xml_escape(&option.storage_type),
2214 option.min_storage_size,
2215 option.max_storage_size,
2216 )
2217}
2218
2219fn tag_xml(tag: &RdsTag) -> String {
2220 format!(
2221 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2222 xml_escape(&tag.key),
2223 xml_escape(&tag.value),
2224 )
2225}
2226
2227fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2228 let status = status_override.unwrap_or(&instance.db_instance_status);
2229 let db_name_xml = instance
2230 .db_name
2231 .as_ref()
2232 .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2233 .unwrap_or_default();
2234
2235 let read_replica_source_xml = instance
2236 .read_replica_source_db_instance_identifier
2237 .as_ref()
2238 .map(|source| {
2239 format!(
2240 "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2241 xml_escape(source)
2242 )
2243 })
2244 .unwrap_or_default();
2245
2246 let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2247 "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2248 } else {
2249 format!(
2250 "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2251 instance
2252 .read_replica_db_instance_identifiers
2253 .iter()
2254 .map(|id| format!(
2255 "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2256 xml_escape(id)
2257 ))
2258 .collect::<String>()
2259 )
2260 };
2261
2262 let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2263 "<VpcSecurityGroups/>".to_string()
2264 } else {
2265 format!(
2266 "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2267 instance
2268 .vpc_security_group_ids
2269 .iter()
2270 .map(|sg_id| format!(
2271 "<VpcSecurityGroupMembership>\
2272 <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2273 <Status>active</Status>\
2274 </VpcSecurityGroupMembership>",
2275 xml_escape(sg_id)
2276 ))
2277 .collect::<String>()
2278 )
2279 };
2280
2281 let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2282 Some(pg_name) => format!(
2283 "<DBParameterGroups>\
2284 <DBParameterGroup>\
2285 <DBParameterGroupName>{}</DBParameterGroupName>\
2286 <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2287 </DBParameterGroup>\
2288 </DBParameterGroups>",
2289 xml_escape(pg_name)
2290 ),
2291 None => "<DBParameterGroups/>".to_string(),
2292 };
2293
2294 let option_group_memberships_xml = match &instance.option_group_name {
2295 Some(og_name) => format!(
2296 "<OptionGroupMemberships>\
2297 <OptionGroupMembership>\
2298 <OptionGroupName>{}</OptionGroupName>\
2299 <Status>in-sync</Status>\
2300 </OptionGroupMembership>\
2301 </OptionGroupMemberships>",
2302 xml_escape(og_name)
2303 ),
2304 None => "<OptionGroupMemberships/>".to_string(),
2305 };
2306
2307 let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2308 let mut fields = Vec::new();
2309 if let Some(ref class) = pending.db_instance_class {
2310 fields.push(format!(
2311 "<DBInstanceClass>{}</DBInstanceClass>",
2312 xml_escape(class)
2313 ));
2314 }
2315 if let Some(allocated_storage) = pending.allocated_storage {
2316 fields.push(format!(
2317 "<AllocatedStorage>{}</AllocatedStorage>",
2318 allocated_storage
2319 ));
2320 }
2321 if let Some(backup_retention_period) = pending.backup_retention_period {
2322 fields.push(format!(
2323 "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2324 backup_retention_period
2325 ));
2326 }
2327 if let Some(multi_az) = pending.multi_az {
2328 fields.push(format!(
2329 "<MultiAZ>{}</MultiAZ>",
2330 if multi_az { "true" } else { "false" }
2331 ));
2332 }
2333 if let Some(ref engine_version) = pending.engine_version {
2334 fields.push(format!(
2335 "<EngineVersion>{}</EngineVersion>",
2336 xml_escape(engine_version)
2337 ));
2338 }
2339 if pending.master_user_password.is_some() {
2340 fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2341 }
2342 if !fields.is_empty() {
2343 format!(
2344 "<PendingModifiedValues>{}</PendingModifiedValues>",
2345 fields.join("")
2346 )
2347 } else {
2348 String::new()
2349 }
2350 } else {
2351 String::new()
2352 };
2353
2354 let latest_restorable_time_xml = instance
2355 .latest_restorable_time
2356 .map(|t| {
2357 format!(
2358 "<LatestRestorableTime>{}</LatestRestorableTime>",
2359 t.to_rfc3339()
2360 )
2361 })
2362 .unwrap_or_default();
2363
2364 format!(
2365 "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2366 <DBInstanceClass>{class}</DBInstanceClass>\
2367 <Engine>{engine}</Engine>\
2368 <DBInstanceStatus>{status}</DBInstanceStatus>\
2369 <MasterUsername>{master_username}</MasterUsername>\
2370 {db_name_xml}\
2371 <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2372 <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2373 <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2374 <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2375 <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2376 <DBSecurityGroups/>\
2377 {vpc_security_groups_xml}\
2378 {db_parameter_groups_xml}\
2379 <AvailabilityZone>us-east-1a</AvailabilityZone>\
2380 {latest_restorable_time_xml}\
2381 <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2382 <MultiAZ>{multi_az}</MultiAZ>\
2383 <EngineVersion>{engine_version}</EngineVersion>\
2384 <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2385 {read_replica_identifiers_xml}\
2386 {read_replica_source_xml}\
2387 <LicenseModel>{license_model}</LicenseModel>\
2388 {option_group_memberships_xml}\
2389 <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2390 <StorageType>gp2</StorageType>\
2391 <DbInstancePort>{port}</DbInstancePort>\
2392 <StorageEncrypted>false</StorageEncrypted>\
2393 <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2394 <DeletionProtection>{deletion_protection}</DeletionProtection>\
2395 {pending_modified_values_xml}\
2396 <DBInstanceArn>{arn}</DBInstanceArn>",
2397 identifier = xml_escape(&instance.db_instance_identifier),
2398 class = xml_escape(&instance.db_instance_class),
2399 engine = xml_escape(&instance.engine),
2400 status = xml_escape(status),
2401 master_username = xml_escape(&instance.master_username),
2402 endpoint_address = xml_escape(&instance.endpoint_address),
2403 port = instance.port,
2404 allocated_storage = instance.allocated_storage,
2405 create_time = instance.created_at.to_rfc3339(),
2406 preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2407 backup_retention_period = instance.backup_retention_period,
2408 multi_az = if instance.multi_az { "true" } else { "false" },
2409 engine_version = xml_escape(&instance.engine_version),
2410 license_model = license_model_for_engine(&instance.engine),
2411 publicly_accessible = if instance.publicly_accessible {
2412 "true"
2413 } else {
2414 "false"
2415 },
2416 dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2417 deletion_protection = if instance.deletion_protection {
2418 "true"
2419 } else {
2420 "false"
2421 },
2422 arn = xml_escape(&instance.db_instance_arn),
2423 )
2424}
2425
2426fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2427 format!(
2428 "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2429 <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2430 <SnapshotCreateTime>{}</SnapshotCreateTime>\
2431 <Engine>{}</Engine>\
2432 <EngineVersion>{}</EngineVersion>\
2433 <AllocatedStorage>{}</AllocatedStorage>\
2434 <Status>{}</Status>\
2435 <Port>{}</Port>\
2436 <MasterUsername>{}</MasterUsername>\
2437 {}\
2438 <DbiResourceId>{}</DbiResourceId>\
2439 <SnapshotType>{}</SnapshotType>\
2440 <DBSnapshotArn>{}</DBSnapshotArn>",
2441 xml_escape(&snapshot.db_snapshot_identifier),
2442 xml_escape(&snapshot.db_instance_identifier),
2443 snapshot.snapshot_create_time.to_rfc3339(),
2444 xml_escape(&snapshot.engine),
2445 xml_escape(&snapshot.engine_version),
2446 snapshot.allocated_storage,
2447 xml_escape(&snapshot.status),
2448 snapshot.port,
2449 xml_escape(&snapshot.master_username),
2450 snapshot
2451 .db_name
2452 .as_ref()
2453 .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2454 .unwrap_or_default(),
2455 xml_escape(&snapshot.dbi_resource_id),
2456 xml_escape(&snapshot.snapshot_type),
2457 xml_escape(&snapshot.db_snapshot_arn),
2458 )
2459}
2460
2461fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2462 let subnets_xml = subnet_group
2463 .subnet_ids
2464 .iter()
2465 .zip(&subnet_group.subnet_availability_zones)
2466 .map(|(subnet_id, az)| {
2467 format!(
2468 "<Subnet>\
2469 <SubnetIdentifier>{}</SubnetIdentifier>\
2470 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2471 <SubnetStatus>Active</SubnetStatus>\
2472 </Subnet>",
2473 xml_escape(subnet_id),
2474 xml_escape(az)
2475 )
2476 })
2477 .collect::<String>();
2478
2479 format!(
2480 "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2481 <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2482 <VpcId>{}</VpcId>\
2483 <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2484 <Subnets>{}</Subnets>\
2485 <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2486 xml_escape(&subnet_group.db_subnet_group_name),
2487 xml_escape(&subnet_group.db_subnet_group_description),
2488 xml_escape(&subnet_group.vpc_id),
2489 subnets_xml,
2490 xml_escape(&subnet_group.db_subnet_group_arn),
2491 )
2492}
2493
2494fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2495 format!(
2496 "<DBParameterGroupName>{}</DBParameterGroupName>\
2497 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2498 <Description>{}</Description>\
2499 <DBParameterGroupArn>{}</DBParameterGroupArn>",
2500 xml_escape(¶meter_group.db_parameter_group_name),
2501 xml_escape(¶meter_group.db_parameter_group_family),
2502 xml_escape(¶meter_group.description),
2503 xml_escape(¶meter_group.db_parameter_group_arn),
2504 )
2505}
2506
2507fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2508 AwsServiceError::aws_error(
2509 StatusCode::NOT_FOUND,
2510 "DBInstanceNotFound",
2511 format!("DBInstance {} not found.", identifier),
2512 )
2513}
2514
2515fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2516 AwsServiceError::aws_error(
2517 StatusCode::NOT_FOUND,
2518 "DBSnapshotNotFound",
2519 format!("DBSnapshot {} not found.", identifier),
2520 )
2521}
2522
2523fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2524 AwsServiceError::aws_error(
2525 StatusCode::NOT_FOUND,
2526 "DBInstanceNotFound",
2527 format!("DBInstance {resource_name} not found."),
2528 )
2529}
2530
2531fn find_instance_by_arn<'a>(
2532 state: &'a crate::state::RdsState,
2533 resource_name: &str,
2534) -> Result<&'a DbInstance, AwsServiceError> {
2535 state
2536 .instances
2537 .values()
2538 .find(|instance| instance.db_instance_arn == resource_name)
2539 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2540}
2541
2542fn find_instance_by_arn_mut<'a>(
2543 state: &'a mut crate::state::RdsState,
2544 resource_name: &str,
2545) -> Result<&'a mut DbInstance, AwsServiceError> {
2546 state
2547 .instances
2548 .values_mut()
2549 .find(|instance| instance.db_instance_arn == resource_name)
2550 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2551}
2552
2553fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2554 for tag in incoming {
2555 if let Some(existing_tag) = existing
2556 .iter_mut()
2557 .find(|candidate| candidate.key == tag.key)
2558 {
2559 existing_tag.value = tag.value.clone();
2560 } else {
2561 existing.push(tag.clone());
2562 }
2563 }
2564}
2565
2566fn license_model_for_engine(engine: &str) -> &'static str {
2567 match engine {
2568 "mysql" | "mariadb" => "general-public-license",
2569 _ => "postgresql-license",
2570 }
2571}
2572
2573fn default_db_name(engine: &str) -> &'static str {
2574 match engine {
2575 "mysql" | "mariadb" => "mysql",
2576 _ => "postgres",
2577 }
2578}
2579
2580fn default_port_for_engine(engine: &str) -> i32 {
2583 match engine {
2584 "postgres" => 5432,
2585 "mysql" | "mariadb" => 3306,
2586 _ => 5432,
2587 }
2588}
2589
2590fn default_parameter_group(engine: &str, engine_version: &str) -> String {
2595 match engine {
2596 "postgres" => {
2597 let major = engine_version.split('.').next().unwrap_or("16");
2598 format!("default.postgres{}", major)
2599 }
2600 "mysql" => {
2601 let major = if engine_version.starts_with("5.7") {
2602 "5.7"
2603 } else {
2604 "8.0"
2605 };
2606 format!("default.mysql{}", major)
2607 }
2608 "mariadb" => {
2609 let major = if engine_version.starts_with("10.11") {
2610 "10.11"
2611 } else {
2612 "10.6"
2613 };
2614 format!("default.mariadb{}", major)
2615 }
2616 _ => "default.postgres16".to_string(),
2617 }
2618}
2619
2620fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2621 match error {
2622 RuntimeError::Unavailable => AwsServiceError::aws_error(
2623 StatusCode::SERVICE_UNAVAILABLE,
2624 "InvalidParameterValue",
2625 "Docker/Podman is required for RDS DB instances but is not available",
2626 ),
2627 RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2628 StatusCode::INTERNAL_SERVER_ERROR,
2629 "InternalFailure",
2630 message,
2631 ),
2632 }
2633}
2634
2635#[cfg(test)]
2636mod tests {
2637 use std::collections::HashMap;
2638 use std::sync::Arc;
2639
2640 use bytes::Bytes;
2641 use chrono::Utc;
2642 use http::{HeaderMap, Method};
2643 use parking_lot::RwLock;
2644 use uuid::Uuid;
2645
2646 use super::{
2647 db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2648 optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2649 };
2650 use crate::state::{
2651 default_engine_versions, default_orderable_options, DbInstance, RdsState, RdsTag,
2652 };
2653 use fakecloud_core::service::{AwsRequest, AwsService};
2654
2655 #[test]
2656 fn filter_engine_versions_matches_requested_engine() {
2657 let versions = default_engine_versions();
2658
2659 let filtered =
2660 filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2661
2662 assert_eq!(filtered.len(), 4); assert!(filtered.iter().all(|v| v.engine == "postgres"));
2664 }
2665
2666 #[test]
2667 fn filter_orderable_options_respects_instance_class() {
2668 let options = default_orderable_options();
2669
2670 let filtered = filter_orderable_options(
2671 &options,
2672 &Some("postgres".to_string()),
2673 &Some("16.3".to_string()),
2674 &Some("db.t3.micro".to_string()),
2675 &None,
2676 Some(true),
2677 );
2678
2679 assert_eq!(filtered.len(), 1);
2680 assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2681 }
2682
2683 #[test]
2684 fn validate_create_request_rejects_unsupported_engine() {
2685 let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2686 .expect_err("unsupported engine");
2687
2688 assert_eq!(error.code(), "InvalidParameterValue");
2689 }
2690
2691 #[test]
2692 fn optional_i32_param_rejects_invalid_integer() {
2693 let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2694
2695 let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2696
2697 assert_eq!(error.code(), "InvalidParameterValue");
2698 }
2699
2700 #[test]
2701 fn db_instance_xml_renders_endpoint_and_status() {
2702 let created_at = Utc::now();
2703 let instance = DbInstance {
2704 db_instance_identifier: "test-db".to_string(),
2705 db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2706 db_instance_class: "db.t3.micro".to_string(),
2707 engine: "postgres".to_string(),
2708 engine_version: "16.3".to_string(),
2709 db_instance_status: "available".to_string(),
2710 master_username: "admin".to_string(),
2711 db_name: Some("appdb".to_string()),
2712 endpoint_address: "127.0.0.1".to_string(),
2713 port: 15432,
2714 allocated_storage: 20,
2715 publicly_accessible: true,
2716 deletion_protection: false,
2717 created_at,
2718 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2719 master_user_password: "secret123".to_string(),
2720 container_id: "container".to_string(),
2721 host_port: 15432,
2722 tags: Vec::new(),
2723 read_replica_source_db_instance_identifier: None,
2724 read_replica_db_instance_identifiers: Vec::new(),
2725 vpc_security_group_ids: vec!["sg-12345678".to_string()],
2726 db_parameter_group_name: Some("default.postgres16".to_string()),
2727 backup_retention_period: 1,
2728 preferred_backup_window: "03:00-04:00".to_string(),
2729 latest_restorable_time: Some(created_at),
2730 option_group_name: None,
2731 multi_az: false,
2732 pending_modified_values: None,
2733 };
2734
2735 let xml = db_instance_xml(&instance, Some("creating"));
2736
2737 assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2738 assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2739 assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2740 }
2741
2742 #[test]
2743 fn parse_tags_reads_rds_query_shape() {
2744 let request = request(
2745 "AddTagsToResource",
2746 &[
2747 ("Tags.Tag.1.Key", "env"),
2748 ("Tags.Tag.1.Value", "dev"),
2749 ("Tags.Tag.2.Key", "team"),
2750 ("Tags.Tag.2.Value", "core"),
2751 ],
2752 );
2753
2754 let tags = parse_tags(&request).expect("tags");
2755
2756 assert_eq!(
2757 tags,
2758 vec![
2759 RdsTag {
2760 key: "env".to_string(),
2761 value: "dev".to_string(),
2762 },
2763 RdsTag {
2764 key: "team".to_string(),
2765 value: "core".to_string(),
2766 }
2767 ]
2768 );
2769 }
2770
2771 #[test]
2772 fn parse_tag_keys_reads_member_shape() {
2773 let request = request(
2774 "RemoveTagsFromResource",
2775 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2776 );
2777
2778 let tag_keys = parse_tag_keys(&request).expect("tag keys");
2779
2780 assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2781 }
2782
2783 #[test]
2784 fn merge_tags_updates_existing_values() {
2785 let mut tags = vec![RdsTag {
2786 key: "env".to_string(),
2787 value: "dev".to_string(),
2788 }];
2789
2790 merge_tags(
2791 &mut tags,
2792 &[
2793 RdsTag {
2794 key: "env".to_string(),
2795 value: "prod".to_string(),
2796 },
2797 RdsTag {
2798 key: "team".to_string(),
2799 value: "core".to_string(),
2800 },
2801 ],
2802 );
2803
2804 assert_eq!(tags.len(), 2);
2805 assert_eq!(tags[0].value, "prod");
2806 assert_eq!(tags[1].key, "team");
2807 }
2808
2809 #[tokio::test]
2810 async fn describe_engine_versions_returns_xml_body() {
2811 let service = RdsService::new(Arc::new(RwLock::new(RdsState::new(
2812 "123456789012",
2813 "us-east-1",
2814 ))));
2815 let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2816
2817 let response = service.handle(request).await.expect("response");
2818 let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
2819
2820 assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2821 assert!(body.contains("<Engine>postgres</Engine>"));
2822 assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2823 }
2824
2825 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2826 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2827 for (key, value) in params {
2828 query_params.insert((*key).to_string(), (*value).to_string());
2829 }
2830
2831 AwsRequest {
2832 service: "rds".to_string(),
2833 action: action.to_string(),
2834 region: "us-east-1".to_string(),
2835 account_id: "123456789012".to_string(),
2836 request_id: "test-request-id".to_string(),
2837 headers: HeaderMap::new(),
2838 query_params,
2839 body: Bytes::new(),
2840 path_segments: vec![],
2841 raw_path: "/".to_string(),
2842 raw_query: String::new(),
2843 method: Method::POST,
2844 is_query_protocol: true,
2845 access_key_id: None,
2846 }
2847 }
2848}