1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8
9use fakecloud_aws::xml::xml_escape;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::runtime::{RdsRuntime, RuntimeError};
13use crate::state::{
14 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
15 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsTag, SharedRdsState,
16};
17
18const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
19const SUPPORTED_ACTIONS: &[&str] = &[
20 "AddTagsToResource",
21 "CreateDBInstance",
22 "CreateDBInstanceReadReplica",
23 "CreateDBParameterGroup",
24 "CreateDBSnapshot",
25 "CreateDBSubnetGroup",
26 "DeleteDBInstance",
27 "DeleteDBParameterGroup",
28 "DeleteDBSnapshot",
29 "DeleteDBSubnetGroup",
30 "DescribeDBEngineVersions",
31 "DescribeDBInstances",
32 "DescribeDBParameterGroups",
33 "DescribeDBSnapshots",
34 "DescribeDBSubnetGroups",
35 "DescribeOrderableDBInstanceOptions",
36 "ListTagsForResource",
37 "ModifyDBInstance",
38 "ModifyDBParameterGroup",
39 "ModifyDBSubnetGroup",
40 "RebootDBInstance",
41 "RemoveTagsFromResource",
42 "RestoreDBInstanceFromDBSnapshot",
43];
44
45pub struct RdsService {
46 state: SharedRdsState,
47 runtime: Option<Arc<RdsRuntime>>,
48}
49
50impl RdsService {
51 pub fn new(state: SharedRdsState) -> Self {
52 Self {
53 state,
54 runtime: None,
55 }
56 }
57
58 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
59 self.runtime = Some(runtime);
60 self
61 }
62}
63
64#[async_trait]
65impl AwsService for RdsService {
66 fn service_name(&self) -> &str {
67 "rds"
68 }
69
70 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
71 match request.action.as_str() {
72 "AddTagsToResource" => self.add_tags_to_resource(&request),
73 "CreateDBInstance" => self.create_db_instance(&request).await,
74 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
75 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
76 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
77 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
78 "DeleteDBInstance" => self.delete_db_instance(&request).await,
79 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
80 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
81 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
82 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
83 "DescribeDBInstances" => self.describe_db_instances(&request),
84 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
85 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
86 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
87 "DescribeOrderableDBInstanceOptions" => {
88 self.describe_orderable_db_instance_options(&request)
89 }
90 "ListTagsForResource" => self.list_tags_for_resource(&request),
91 "ModifyDBInstance" => self.modify_db_instance(&request),
92 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
93 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
94 "RebootDBInstance" => self.reboot_db_instance(&request).await,
95 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
96 "RestoreDBInstanceFromDBSnapshot" => {
97 self.restore_db_instance_from_db_snapshot(&request).await
98 }
99 _ => Err(AwsServiceError::action_not_implemented(
100 self.service_name(),
101 &request.action,
102 )),
103 }
104 }
105
106 fn supported_actions(&self) -> &[&str] {
107 SUPPORTED_ACTIONS
108 }
109}
110
111impl RdsService {
112 async fn create_db_instance(
113 &self,
114 request: &AwsRequest,
115 ) -> Result<AwsResponse, AwsServiceError> {
116 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
117 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
118 let db_instance_class = required_param(request, "DBInstanceClass")?;
119 let engine = required_param(request, "Engine")?;
120 let master_username = required_param(request, "MasterUsername")?;
121 let master_user_password = required_param(request, "MasterUserPassword")?;
122 let db_name = optional_param(request, "DBName");
123 let engine_version =
124 optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
125 let publicly_accessible =
126 parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
127 .unwrap_or(true);
128 let deletion_protection =
129 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
130 .unwrap_or(false);
131 let default_port = match engine.as_str() {
133 "postgres" => 5432,
134 "mysql" | "mariadb" => 3306,
135 _ => 5432,
136 };
137 let port = optional_i32_param(request, "Port")?.unwrap_or(default_port);
138 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
139
140 let default_param_group = match engine.as_str() {
142 "postgres" => {
143 let major = engine_version.split('.').next().unwrap_or("16");
144 format!("default.postgres{}", major)
145 }
146 "mysql" => {
147 let major = if engine_version.starts_with("5.7") {
148 "5.7"
149 } else {
150 "8.0"
151 };
152 format!("default.mysql{}", major)
153 }
154 "mariadb" => {
155 let major = if engine_version.starts_with("10.11") {
156 "10.11"
157 } else {
158 "10.6"
159 };
160 format!("default.mariadb{}", major)
161 }
162 _ => "default.postgres16".to_string(),
163 };
164 let db_parameter_group_name =
165 optional_param(request, "DBParameterGroupName").or(Some(default_param_group));
166
167 let backup_retention_period =
168 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
169 let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
170 .unwrap_or_else(|| "03:00-04:00".to_string());
171 let option_group_name = optional_param(request, "OptionGroupName");
172 let multi_az =
173 parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
174
175 validate_create_request(
176 &db_instance_identifier,
177 allocated_storage,
178 &db_instance_class,
179 &engine,
180 &engine_version,
181 port,
182 )?;
183
184 {
185 let mut state = self.state.write();
186 if !state.begin_instance_creation(&db_instance_identifier) {
187 return Err(AwsServiceError::aws_error(
188 StatusCode::BAD_REQUEST,
189 "DBInstanceAlreadyExists",
190 format!("DBInstance {} already exists.", db_instance_identifier),
191 ));
192 }
193 }
194
195 let runtime = self.runtime.as_ref().ok_or_else(|| {
196 AwsServiceError::aws_error(
197 StatusCode::SERVICE_UNAVAILABLE,
198 "InvalidParameterValue",
199 "Docker/Podman is required for RDS DB instances but is not available",
200 )
201 })?;
202
203 let logical_db_name = db_name.clone().unwrap_or_else(|| match engine.as_str() {
205 "postgres" => "postgres".to_string(),
206 "mysql" | "mariadb" => "mysql".to_string(),
207 _ => "postgres".to_string(),
208 });
209 let running = runtime
210 .ensure_postgres(
211 &db_instance_identifier,
212 &engine,
213 &engine_version,
214 &master_username,
215 &master_user_password,
216 &logical_db_name,
217 )
218 .await
219 .map_err(|error| {
220 self.state
221 .write()
222 .cancel_instance_creation(&db_instance_identifier);
223 runtime_error_to_service_error(error)
224 })?;
225
226 let mut state = self.state.write();
227 let created_at = Utc::now();
228 let instance = DbInstance {
229 db_instance_identifier: db_instance_identifier.clone(),
230 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
231 db_instance_class: db_instance_class.clone(),
232 engine: engine.clone(),
233 engine_version: engine_version.clone(),
234 db_instance_status: "available".to_string(),
235 master_username: master_username.clone(),
236 db_name: db_name.clone(),
237 endpoint_address: "127.0.0.1".to_string(),
238 port: i32::from(running.host_port),
239 allocated_storage,
240 publicly_accessible,
241 deletion_protection,
242 created_at,
243 dbi_resource_id: state.next_dbi_resource_id(),
244 master_user_password,
245 container_id: running.container_id,
246 host_port: running.host_port,
247 tags: Vec::new(),
248 read_replica_source_db_instance_identifier: None,
249 read_replica_db_instance_identifiers: Vec::new(),
250 vpc_security_group_ids,
251 db_parameter_group_name,
252 backup_retention_period,
253 preferred_backup_window,
254 latest_restorable_time: created_at,
255 option_group_name,
256 multi_az,
257 pending_modified_values: None,
258 };
259 state.finish_instance_creation(instance.clone());
260
261 Ok(AwsResponse::xml(
262 StatusCode::OK,
263 xml_wrap(
264 "CreateDBInstance",
265 &format!(
266 "<DBInstance>{}</DBInstance>",
267 db_instance_xml(&instance, Some("creating"))
268 ),
269 &request.request_id,
270 ),
271 ))
272 }
273
274 async fn delete_db_instance(
275 &self,
276 request: &AwsRequest,
277 ) -> Result<AwsResponse, AwsServiceError> {
278 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
279 let skip_final_snapshot =
280 parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
281 .unwrap_or(false);
282 let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
283
284 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
285 return Err(AwsServiceError::aws_error(
286 StatusCode::BAD_REQUEST,
287 "InvalidParameterCombination",
288 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
289 ));
290 }
291 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
292 return Err(AwsServiceError::aws_error(
293 StatusCode::BAD_REQUEST,
294 "InvalidParameterCombination",
295 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
296 ));
297 }
298
299 {
301 let state = self.state.read();
302 if let Some(instance) = state.instances.get(&db_instance_identifier) {
303 if instance.deletion_protection {
304 return Err(AwsServiceError::aws_error(
305 StatusCode::BAD_REQUEST,
306 "InvalidDBInstanceState",
307 format!(
308 "DBInstance {} cannot be deleted because deletion protection is enabled.",
309 db_instance_identifier
310 ),
311 ));
312 }
313 } else {
314 return Err(db_instance_not_found(&db_instance_identifier));
315 }
316 }
317
318 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
320 let runtime = self.runtime.as_ref().ok_or_else(|| {
321 AwsServiceError::aws_error(
322 StatusCode::SERVICE_UNAVAILABLE,
323 "InvalidParameterValue",
324 "Docker/Podman is required for RDS snapshots but is not available",
325 )
326 })?;
327
328 let (instance_for_snapshot, db_name) = {
329 let state = self.state.read();
330
331 if state.snapshots.contains_key(snapshot_id) {
332 return Err(AwsServiceError::aws_error(
333 StatusCode::CONFLICT,
334 "DBSnapshotAlreadyExists",
335 format!("DBSnapshot {snapshot_id} already exists."),
336 ));
337 }
338
339 let instance = state
340 .instances
341 .get(&db_instance_identifier)
342 .cloned()
343 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
344
345 let db_name = instance
346 .db_name
347 .as_deref()
348 .unwrap_or("postgres")
349 .to_string();
350
351 (instance, db_name)
352 };
353
354 let dump_data = runtime
355 .dump_database(
356 &db_instance_identifier,
357 &instance_for_snapshot.master_username,
358 &db_name,
359 )
360 .await
361 .map_err(runtime_error_to_service_error)?;
362
363 let mut state = self.state.write();
364
365 if state.snapshots.contains_key(snapshot_id) {
366 return Err(AwsServiceError::aws_error(
367 StatusCode::CONFLICT,
368 "DBSnapshotAlreadyExists",
369 format!("DBSnapshot {snapshot_id} already exists."),
370 ));
371 }
372
373 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
374
375 let snapshot = DbSnapshot {
376 db_snapshot_identifier: snapshot_id.clone(),
377 db_snapshot_arn: snapshot_arn,
378 db_instance_identifier: db_instance_identifier.clone(),
379 snapshot_create_time: Utc::now(),
380 engine: instance_for_snapshot.engine.clone(),
381 engine_version: instance_for_snapshot.engine_version.clone(),
382 allocated_storage: instance_for_snapshot.allocated_storage,
383 status: "available".to_string(),
384 port: instance_for_snapshot.port,
385 master_username: instance_for_snapshot.master_username.clone(),
386 db_name: instance_for_snapshot.db_name.clone(),
387 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
388 snapshot_type: "manual".to_string(),
389 master_user_password: instance_for_snapshot.master_user_password.clone(),
390 tags: Vec::new(),
391 dump_data,
392 };
393
394 state.snapshots.insert(snapshot_id.clone(), snapshot);
395 }
396
397 let instance = {
398 let mut state = self.state.write();
399 let instance = state
400 .instances
401 .remove(&db_instance_identifier)
402 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
403
404 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
405 if let Some(source) = state.instances.get_mut(source_id) {
406 source
407 .read_replica_db_instance_identifiers
408 .retain(|id| id != &db_instance_identifier);
409 }
410 }
411
412 for replica_id in &instance.read_replica_db_instance_identifiers {
413 if let Some(replica) = state.instances.get_mut(replica_id) {
414 replica.read_replica_source_db_instance_identifier = None;
415 }
416 }
417
418 instance
419 };
420
421 if let Some(runtime) = &self.runtime {
422 runtime.stop_container(&db_instance_identifier).await;
423 }
424
425 Ok(AwsResponse::xml(
426 StatusCode::OK,
427 xml_wrap(
428 "DeleteDBInstance",
429 &format!(
430 "<DBInstance>{}</DBInstance>",
431 db_instance_xml(&instance, Some("deleting"))
432 ),
433 &request.request_id,
434 ),
435 ))
436 }
437
438 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
440 let db_instance_class = optional_param(request, "DBInstanceClass");
441 let deletion_protection =
442 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
443 let apply_immediately =
444 parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
445
446 let vpc_security_group_ids = {
448 let mut ids = Vec::new();
449 for index in 1.. {
450 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
451 match optional_param(request, &sg_id_name) {
452 Some(sg_id) => ids.push(sg_id),
453 None => break,
454 }
455 }
456 if ids.is_empty() {
457 None
458 } else {
459 Some(ids)
460 }
461 };
462
463 if db_instance_class.is_none()
464 && deletion_protection.is_none()
465 && vpc_security_group_ids.is_none()
466 {
467 return Err(AwsServiceError::aws_error(
468 StatusCode::BAD_REQUEST,
469 "InvalidParameterCombination",
470 "At least one supported mutable field must be provided.",
471 ));
472 }
473 if let Some(ref class) = db_instance_class {
474 validate_db_instance_class(class)?;
475 }
476
477 let mut state = self.state.write();
478 let instance = state
479 .instances
480 .get_mut(&db_instance_identifier)
481 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
482
483 if apply_immediately == Some(false) {
485 let pending = instance
486 .pending_modified_values
487 .get_or_insert(Default::default());
488 if let Some(class) = db_instance_class {
489 pending.db_instance_class = Some(class);
490 }
491 if let Some(deletion_protection) = deletion_protection {
494 instance.deletion_protection = deletion_protection;
495 }
496 if let Some(security_group_ids) = vpc_security_group_ids {
497 instance.vpc_security_group_ids = security_group_ids;
498 }
499 } else {
500 if let Some(class) = db_instance_class {
502 instance.db_instance_class = class;
503 }
504 if let Some(deletion_protection) = deletion_protection {
505 instance.deletion_protection = deletion_protection;
506 }
507 if let Some(security_group_ids) = vpc_security_group_ids {
508 instance.vpc_security_group_ids = security_group_ids;
509 }
510 }
511
512 Ok(AwsResponse::xml(
513 StatusCode::OK,
514 xml_wrap(
515 "ModifyDBInstance",
516 &format!(
517 "<DBInstance>{}</DBInstance>",
518 db_instance_xml(instance, Some("modifying"))
519 ),
520 &request.request_id,
521 ),
522 ))
523 }
524
525 async fn reboot_db_instance(
526 &self,
527 request: &AwsRequest,
528 ) -> Result<AwsResponse, AwsServiceError> {
529 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
530 let force_failover =
531 parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
532 if force_failover == Some(true) {
533 return Err(AwsServiceError::aws_error(
534 StatusCode::BAD_REQUEST,
535 "InvalidParameterCombination",
536 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
537 ));
538 }
539
540 let instance = {
541 let state = self.state.read();
542 state
543 .instances
544 .get(&db_instance_identifier)
545 .cloned()
546 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
547 };
548
549 let runtime = self.runtime.as_ref().ok_or_else(|| {
550 AwsServiceError::aws_error(
551 StatusCode::SERVICE_UNAVAILABLE,
552 "InvalidParameterValue",
553 "Docker/Podman is required for RDS DB instances but is not available",
554 )
555 })?;
556
557 let running = runtime
558 .restart_container(
559 &db_instance_identifier,
560 &instance.engine,
561 &instance.master_username,
562 &instance.master_user_password,
563 instance.db_name.as_deref().unwrap_or("postgres"),
564 )
565 .await
566 .map_err(runtime_error_to_service_error)?;
567
568 let instance = {
569 let mut state = self.state.write();
570 let instance = state
571 .instances
572 .get_mut(&db_instance_identifier)
573 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
574 instance.host_port = running.host_port;
575 instance.port = i32::from(running.host_port);
576
577 if let Some(pending) = instance.pending_modified_values.take() {
579 if let Some(class) = pending.db_instance_class {
580 instance.db_instance_class = class;
581 }
582 if let Some(allocated_storage) = pending.allocated_storage {
583 instance.allocated_storage = allocated_storage;
584 }
585 if let Some(backup_retention_period) = pending.backup_retention_period {
586 instance.backup_retention_period = backup_retention_period;
587 }
588 if let Some(multi_az) = pending.multi_az {
589 instance.multi_az = multi_az;
590 }
591 if let Some(engine_version) = pending.engine_version {
592 instance.engine_version = engine_version;
593 }
594 if let Some(master_user_password) = pending.master_user_password {
595 instance.master_user_password = master_user_password;
596 }
597 }
598
599 instance.clone()
600 };
601
602 Ok(AwsResponse::xml(
603 StatusCode::OK,
604 xml_wrap(
605 "RebootDBInstance",
606 &format!(
607 "<DBInstance>{}</DBInstance>",
608 db_instance_xml(&instance, Some("rebooting"))
609 ),
610 &request.request_id,
611 ),
612 ))
613 }
614
615 fn describe_db_engine_versions(
616 &self,
617 request: &AwsRequest,
618 ) -> Result<AwsResponse, AwsServiceError> {
619 let engine = optional_param(request, "Engine");
620 let engine_version = optional_param(request, "EngineVersion");
621 let family = optional_param(request, "DBParameterGroupFamily");
622 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
623
624 let mut versions = filter_engine_versions(
625 &default_engine_versions(),
626 &engine,
627 &engine_version,
628 &family,
629 );
630
631 if default_only.unwrap_or(false) {
632 versions.truncate(1);
633 }
634
635 Ok(AwsResponse::xml(
636 StatusCode::OK,
637 xml_wrap(
638 "DescribeDBEngineVersions",
639 &format!(
640 "<DBEngineVersions>{}</DBEngineVersions>",
641 versions.iter().map(engine_version_xml).collect::<String>()
642 ),
643 &request.request_id,
644 ),
645 ))
646 }
647
648 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
649 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
650 let marker = optional_param(request, "Marker");
651 let max_records = optional_param(request, "MaxRecords");
652
653 let state = self.state.read();
654
655 if let Some(identifier) = db_instance_identifier {
657 let instance = state
658 .instances
659 .get(&identifier)
660 .cloned()
661 .ok_or_else(|| db_instance_not_found(&identifier))?;
662
663 return Ok(AwsResponse::xml(
664 StatusCode::OK,
665 xml_wrap(
666 "DescribeDBInstances",
667 &format!(
668 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
669 db_instance_xml(&instance, None)
670 ),
671 &request.request_id,
672 ),
673 ));
674 }
675
676 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
678 instances.sort_by(|a, b| {
679 a.created_at
680 .cmp(&b.created_at)
681 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
682 });
683
684 let paginated = paginate(instances, marker, max_records, |inst| {
686 &inst.db_instance_identifier
687 })?;
688
689 let marker_xml = paginated
690 .next_marker
691 .as_ref()
692 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
693 .unwrap_or_default();
694
695 Ok(AwsResponse::xml(
696 StatusCode::OK,
697 xml_wrap(
698 "DescribeDBInstances",
699 &format!(
700 "<DBInstances>{}</DBInstances>{}",
701 paginated
702 .items
703 .iter()
704 .map(|instance| {
705 format!(
706 "<DBInstance>{}</DBInstance>",
707 db_instance_xml(instance, None)
708 )
709 })
710 .collect::<String>(),
711 marker_xml
712 ),
713 &request.request_id,
714 ),
715 ))
716 }
717
718 fn describe_orderable_db_instance_options(
719 &self,
720 request: &AwsRequest,
721 ) -> Result<AwsResponse, AwsServiceError> {
722 let engine = optional_param(request, "Engine");
723 let engine_version = optional_param(request, "EngineVersion");
724 let db_instance_class = optional_param(request, "DBInstanceClass");
725 let license_model = optional_param(request, "LicenseModel");
726 let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
727
728 let options = filter_orderable_options(
729 &default_orderable_options(),
730 &engine,
731 &engine_version,
732 &db_instance_class,
733 &license_model,
734 vpc,
735 );
736
737 Ok(AwsResponse::xml(
738 StatusCode::OK,
739 xml_wrap(
740 "DescribeOrderableDBInstanceOptions",
741 &format!(
742 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
743 options.iter().map(orderable_option_xml).collect::<String>()
744 ),
745 &request.request_id,
746 ),
747 ))
748 }
749
750 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
751 let resource_name = required_param(request, "ResourceName")?;
752 let tags = parse_tags(request)?;
753
754 if tags.is_empty() {
755 return Err(AwsServiceError::aws_error(
756 StatusCode::BAD_REQUEST,
757 "MissingParameter",
758 "The request must contain the parameter Tags.",
759 ));
760 }
761
762 let mut state = self.state.write();
763 let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
764 merge_tags(&mut instance.tags, &tags);
765
766 Ok(AwsResponse::xml(
767 StatusCode::OK,
768 xml_wrap("AddTagsToResource", "", &request.request_id),
769 ))
770 }
771
772 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
773 let resource_name = required_param(request, "ResourceName")?;
774 if query_param_prefix_exists(request, "Filters.") {
775 return Err(AwsServiceError::aws_error(
776 StatusCode::BAD_REQUEST,
777 "InvalidParameterValue",
778 "Filters are not yet supported for ListTagsForResource.",
779 ));
780 }
781
782 let state = self.state.read();
783 let instance = find_instance_by_arn(&state, &resource_name)?;
784 let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
785
786 Ok(AwsResponse::xml(
787 StatusCode::OK,
788 xml_wrap(
789 "ListTagsForResource",
790 &format!("<TagList>{tag_xml}</TagList>"),
791 &request.request_id,
792 ),
793 ))
794 }
795
796 fn remove_tags_from_resource(
797 &self,
798 request: &AwsRequest,
799 ) -> Result<AwsResponse, AwsServiceError> {
800 let resource_name = required_param(request, "ResourceName")?;
801 let tag_keys = parse_tag_keys(request)?;
802
803 if tag_keys.is_empty() {
804 return Err(AwsServiceError::aws_error(
805 StatusCode::BAD_REQUEST,
806 "MissingParameter",
807 "The request must contain the parameter TagKeys.",
808 ));
809 }
810
811 let mut state = self.state.write();
812 let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
813 instance
814 .tags
815 .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
816
817 Ok(AwsResponse::xml(
818 StatusCode::OK,
819 xml_wrap("RemoveTagsFromResource", "", &request.request_id),
820 ))
821 }
822
823 async fn create_db_snapshot(
824 &self,
825 request: &AwsRequest,
826 ) -> Result<AwsResponse, AwsServiceError> {
827 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
828 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
829
830 let runtime = self.runtime.as_ref().ok_or_else(|| {
831 AwsServiceError::aws_error(
832 StatusCode::SERVICE_UNAVAILABLE,
833 "InvalidParameterValue",
834 "Docker/Podman is required for RDS snapshots but is not available",
835 )
836 })?;
837
838 let (instance, db_name) = {
839 let state = self.state.write();
840
841 if state.snapshots.contains_key(&db_snapshot_identifier) {
842 return Err(AwsServiceError::aws_error(
843 StatusCode::CONFLICT,
844 "DBSnapshotAlreadyExists",
845 format!("DBSnapshot {db_snapshot_identifier} already exists."),
846 ));
847 }
848
849 let instance = state
850 .instances
851 .get(&db_instance_identifier)
852 .cloned()
853 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
854
855 let db_name = instance
856 .db_name
857 .as_deref()
858 .unwrap_or("postgres")
859 .to_string();
860
861 (instance, db_name)
862 };
863
864 let dump_data = runtime
865 .dump_database(&db_instance_identifier, &instance.master_username, &db_name)
866 .await
867 .map_err(runtime_error_to_service_error)?;
868
869 let mut state = self.state.write();
870
871 if state.snapshots.contains_key(&db_snapshot_identifier) {
872 return Err(AwsServiceError::aws_error(
873 StatusCode::CONFLICT,
874 "DBSnapshotAlreadyExists",
875 format!("DBSnapshot {db_snapshot_identifier} already exists."),
876 ));
877 }
878
879 let snapshot = DbSnapshot {
880 db_snapshot_identifier: db_snapshot_identifier.clone(),
881 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
882 db_instance_identifier: instance.db_instance_identifier.clone(),
883 snapshot_create_time: Utc::now(),
884 engine: instance.engine.clone(),
885 engine_version: instance.engine_version.clone(),
886 allocated_storage: instance.allocated_storage,
887 status: "available".to_string(),
888 port: instance.port,
889 master_username: instance.master_username.clone(),
890 db_name: instance.db_name.clone(),
891 dbi_resource_id: instance.dbi_resource_id.clone(),
892 snapshot_type: "manual".to_string(),
893 master_user_password: instance.master_user_password.clone(),
894 tags: Vec::new(),
895 dump_data,
896 };
897
898 state
899 .snapshots
900 .insert(db_snapshot_identifier, snapshot.clone());
901
902 Ok(AwsResponse::xml(
903 StatusCode::OK,
904 xml_wrap(
905 "CreateDBSnapshot",
906 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
907 &request.request_id,
908 ),
909 ))
910 }
911
912 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
913 let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
914 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
915 let marker = optional_param(request, "Marker");
916 let max_records = optional_param(request, "MaxRecords");
917
918 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
919 return Err(AwsServiceError::aws_error(
920 StatusCode::BAD_REQUEST,
921 "InvalidParameterCombination",
922 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
923 ));
924 }
925
926 let state = self.state.read();
927
928 if let Some(snapshot_id) = db_snapshot_identifier {
930 let snapshot = state
931 .snapshots
932 .get(&snapshot_id)
933 .cloned()
934 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
935
936 return Ok(AwsResponse::xml(
937 StatusCode::OK,
938 xml_wrap(
939 "DescribeDBSnapshots",
940 &format!(
941 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
942 db_snapshot_xml(&snapshot)
943 ),
944 &request.request_id,
945 ),
946 ));
947 }
948
949 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
951 state
952 .snapshots
953 .values()
954 .filter(|s| s.db_instance_identifier == instance_id)
955 .cloned()
956 .collect()
957 } else {
958 state.snapshots.values().cloned().collect()
959 };
960
961 snapshots.sort_by(|a, b| {
963 a.snapshot_create_time
964 .cmp(&b.snapshot_create_time)
965 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
966 });
967
968 let paginated = paginate(snapshots, marker, max_records, |snap| {
970 &snap.db_snapshot_identifier
971 })?;
972
973 let marker_xml = paginated
974 .next_marker
975 .as_ref()
976 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
977 .unwrap_or_default();
978
979 Ok(AwsResponse::xml(
980 StatusCode::OK,
981 xml_wrap(
982 "DescribeDBSnapshots",
983 &format!(
984 "<DBSnapshots>{}</DBSnapshots>{}",
985 paginated
986 .items
987 .iter()
988 .map(|snapshot| format!(
989 "<DBSnapshot>{}</DBSnapshot>",
990 db_snapshot_xml(snapshot)
991 ))
992 .collect::<String>(),
993 marker_xml
994 ),
995 &request.request_id,
996 ),
997 ))
998 }
999
1000 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1002
1003 let mut state = self.state.write();
1004
1005 let snapshot = state
1006 .snapshots
1007 .remove(&db_snapshot_identifier)
1008 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1009
1010 Ok(AwsResponse::xml(
1011 StatusCode::OK,
1012 xml_wrap(
1013 "DeleteDBSnapshot",
1014 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1015 &request.request_id,
1016 ),
1017 ))
1018 }
1019
1020 async fn restore_db_instance_from_db_snapshot(
1021 &self,
1022 request: &AwsRequest,
1023 ) -> Result<AwsResponse, AwsServiceError> {
1024 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1025 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1026 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1027
1028 let runtime = self.runtime.as_ref().ok_or_else(|| {
1029 AwsServiceError::aws_error(
1030 StatusCode::SERVICE_UNAVAILABLE,
1031 "InvalidParameterValue",
1032 "Docker/Podman is required for RDS DB instances but is not available",
1033 )
1034 })?;
1035
1036 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1037 let mut state = self.state.write();
1038
1039 if !state.begin_instance_creation(&db_instance_identifier) {
1040 return Err(AwsServiceError::aws_error(
1041 StatusCode::CONFLICT,
1042 "DBInstanceAlreadyExists",
1043 format!("DBInstance {db_instance_identifier} already exists."),
1044 ));
1045 }
1046
1047 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1048 Some(s) => s,
1049 None => {
1050 state.cancel_instance_creation(&db_instance_identifier);
1051 return Err(db_snapshot_not_found(&db_snapshot_identifier));
1052 }
1053 };
1054
1055 let dbi_resource_id = state.next_dbi_resource_id();
1056 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1057 let created_at = Utc::now();
1058
1059 (snapshot, dbi_resource_id, db_instance_arn, created_at)
1060 };
1061
1062 let db_name = snapshot.db_name.as_deref().unwrap_or("postgres");
1063 let running = match runtime
1064 .ensure_postgres(
1065 &db_instance_identifier,
1066 &snapshot.engine,
1067 &snapshot.engine_version,
1068 &snapshot.master_username,
1069 &snapshot.master_user_password,
1070 db_name,
1071 )
1072 .await
1073 {
1074 Ok(running) => running,
1075 Err(e) => {
1076 self.state
1077 .write()
1078 .cancel_instance_creation(&db_instance_identifier);
1079 return Err(runtime_error_to_service_error(e));
1080 }
1081 };
1082
1083 if let Err(e) = runtime
1084 .restore_database(
1085 &db_instance_identifier,
1086 &snapshot.master_username,
1087 db_name,
1088 &snapshot.dump_data,
1089 )
1090 .await
1091 {
1092 self.state
1093 .write()
1094 .cancel_instance_creation(&db_instance_identifier);
1095 runtime.stop_container(&db_instance_identifier).await;
1096 return Err(runtime_error_to_service_error(e));
1097 }
1098
1099 let mut state = self.state.write();
1100
1101 let instance = DbInstance {
1102 db_instance_identifier: db_instance_identifier.clone(),
1103 db_instance_arn,
1104 db_instance_class: "db.t3.micro".to_string(),
1105 engine: snapshot.engine.clone(),
1106 engine_version: snapshot.engine_version.clone(),
1107 db_instance_status: "available".to_string(),
1108 master_username: snapshot.master_username.clone(),
1109 db_name: snapshot.db_name.clone(),
1110 endpoint_address: "127.0.0.1".to_string(),
1111 port: i32::from(running.host_port),
1112 allocated_storage: snapshot.allocated_storage,
1113 publicly_accessible: true,
1114 deletion_protection: false,
1115 created_at,
1116 dbi_resource_id,
1117 master_user_password: snapshot.master_user_password.clone(),
1118 container_id: running.container_id,
1119 host_port: running.host_port,
1120 tags: Vec::new(),
1121 read_replica_source_db_instance_identifier: None,
1122 read_replica_db_instance_identifiers: Vec::new(),
1123 vpc_security_group_ids,
1124 db_parameter_group_name: None,
1125 backup_retention_period: 1,
1126 preferred_backup_window: "03:00-04:00".to_string(),
1127 latest_restorable_time: created_at,
1128 option_group_name: None,
1129 multi_az: false,
1130 pending_modified_values: None,
1131 };
1132
1133 state.finish_instance_creation(instance.clone());
1134
1135 Ok(AwsResponse::xml(
1136 StatusCode::OK,
1137 xml_wrap(
1138 "RestoreDBInstanceFromDBSnapshot",
1139 &format!(
1140 "<DBInstance>{}</DBInstance>",
1141 db_instance_xml(&instance, None)
1142 ),
1143 &request.request_id,
1144 ),
1145 ))
1146 }
1147
1148 async fn create_db_instance_read_replica(
1149 &self,
1150 request: &AwsRequest,
1151 ) -> Result<AwsResponse, AwsServiceError> {
1152 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1153 let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1154
1155 let runtime = self.runtime.as_ref().ok_or_else(|| {
1156 AwsServiceError::aws_error(
1157 StatusCode::SERVICE_UNAVAILABLE,
1158 "InvalidParameterValue",
1159 "Docker/Podman is required for RDS read replicas but is not available",
1160 )
1161 })?;
1162
1163 let (source_instance, db_name) = {
1164 let mut state = self.state.write();
1165
1166 if !state.begin_instance_creation(&db_instance_identifier) {
1167 return Err(AwsServiceError::aws_error(
1168 StatusCode::CONFLICT,
1169 "DBInstanceAlreadyExists",
1170 format!("DBInstance {db_instance_identifier} already exists."),
1171 ));
1172 }
1173
1174 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1175 {
1176 Some(inst) => inst,
1177 None => {
1178 state.cancel_instance_creation(&db_instance_identifier);
1179 return Err(db_instance_not_found(&source_db_instance_identifier));
1180 }
1181 };
1182
1183 let db_name = source_instance
1184 .db_name
1185 .as_deref()
1186 .unwrap_or("postgres")
1187 .to_string();
1188
1189 (source_instance, db_name)
1190 };
1191
1192 let dump_data = match runtime
1193 .dump_database(
1194 &source_db_instance_identifier,
1195 &source_instance.master_username,
1196 &db_name,
1197 )
1198 .await
1199 {
1200 Ok(data) => data,
1201 Err(e) => {
1202 self.state
1203 .write()
1204 .cancel_instance_creation(&db_instance_identifier);
1205 return Err(runtime_error_to_service_error(e));
1206 }
1207 };
1208
1209 let dbi_resource_id = self.state.read().next_dbi_resource_id();
1210 let db_instance_arn = self.state.read().db_instance_arn(&db_instance_identifier);
1211 let created_at = Utc::now();
1212
1213 let running = match runtime
1214 .ensure_postgres(
1215 &db_instance_identifier,
1216 &source_instance.engine,
1217 &source_instance.engine_version,
1218 &source_instance.master_username,
1219 &source_instance.master_user_password,
1220 &db_name,
1221 )
1222 .await
1223 {
1224 Ok(running) => running,
1225 Err(e) => {
1226 self.state
1227 .write()
1228 .cancel_instance_creation(&db_instance_identifier);
1229 return Err(runtime_error_to_service_error(e));
1230 }
1231 };
1232
1233 if let Err(e) = runtime
1234 .restore_database(
1235 &db_instance_identifier,
1236 &source_instance.master_username,
1237 &db_name,
1238 &dump_data,
1239 )
1240 .await
1241 {
1242 self.state
1243 .write()
1244 .cancel_instance_creation(&db_instance_identifier);
1245 runtime.stop_container(&db_instance_identifier).await;
1246 return Err(runtime_error_to_service_error(e));
1247 }
1248
1249 let replica = DbInstance {
1250 db_instance_identifier: db_instance_identifier.clone(),
1251 db_instance_arn,
1252 db_instance_class: source_instance.db_instance_class.clone(),
1253 engine: source_instance.engine.clone(),
1254 engine_version: source_instance.engine_version.clone(),
1255 db_instance_status: "available".to_string(),
1256 master_username: source_instance.master_username.clone(),
1257 db_name: source_instance.db_name.clone(),
1258 endpoint_address: "127.0.0.1".to_string(),
1259 port: i32::from(running.host_port),
1260 allocated_storage: source_instance.allocated_storage,
1261 publicly_accessible: source_instance.publicly_accessible,
1262 deletion_protection: false,
1263 created_at,
1264 dbi_resource_id,
1265 master_user_password: source_instance.master_user_password.clone(),
1266 container_id: running.container_id,
1267 host_port: running.host_port,
1268 tags: Vec::new(),
1269 read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.clone()),
1270 read_replica_db_instance_identifiers: Vec::new(),
1271 vpc_security_group_ids: source_instance.vpc_security_group_ids.clone(),
1272 db_parameter_group_name: source_instance.db_parameter_group_name.clone(),
1273 backup_retention_period: source_instance.backup_retention_period,
1274 preferred_backup_window: source_instance.preferred_backup_window.clone(),
1275 latest_restorable_time: created_at,
1276 option_group_name: source_instance.option_group_name.clone(),
1277 multi_az: source_instance.multi_az,
1278 pending_modified_values: None,
1279 };
1280
1281 let source_missing = {
1282 let mut state = self.state.write();
1283 match state.instances.get_mut(&source_db_instance_identifier) {
1284 Some(source) => {
1285 source
1286 .read_replica_db_instance_identifiers
1287 .push(db_instance_identifier.clone());
1288 state.finish_instance_creation(replica.clone());
1289 false
1290 }
1291 None => {
1292 state.cancel_instance_creation(&db_instance_identifier);
1293 true
1294 }
1295 }
1296 };
1297
1298 if source_missing {
1299 runtime.stop_container(&db_instance_identifier).await;
1300 return Err(db_instance_not_found(&source_db_instance_identifier));
1301 }
1302
1303 Ok(AwsResponse::xml(
1304 StatusCode::OK,
1305 xml_wrap(
1306 "CreateDBInstanceReadReplica",
1307 &format!(
1308 "<DBInstance>{}</DBInstance>",
1309 db_instance_xml(&replica, None)
1310 ),
1311 &request.request_id,
1312 ),
1313 ))
1314 }
1315
1316 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1318 let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1319 let subnet_ids = parse_subnet_ids(request)?;
1320
1321 if subnet_ids.is_empty() {
1322 return Err(AwsServiceError::aws_error(
1323 StatusCode::BAD_REQUEST,
1324 "InvalidParameterValue",
1325 "At least one subnet must be specified.",
1326 ));
1327 }
1328
1329 if subnet_ids.len() < 2 {
1330 return Err(AwsServiceError::aws_error(
1331 StatusCode::BAD_REQUEST,
1332 "DBSubnetGroupDoesNotCoverEnoughAZs",
1333 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1334 ));
1335 }
1336
1337 let mut state = self.state.write();
1338
1339 if state.subnet_groups.contains_key(&db_subnet_group_name) {
1340 return Err(AwsServiceError::aws_error(
1341 StatusCode::CONFLICT,
1342 "DBSubnetGroupAlreadyExists",
1343 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1344 ));
1345 }
1346
1347 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1348 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1349 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1350 .collect();
1351
1352 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1354 if unique_azs.len() < 2 {
1355 return Err(AwsServiceError::aws_error(
1356 StatusCode::BAD_REQUEST,
1357 "DBSubnetGroupDoesNotCoverEnoughAZs",
1358 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1359 ));
1360 }
1361
1362 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1363 let tags = parse_tags(request)?;
1364
1365 let subnet_group = DbSubnetGroup {
1366 db_subnet_group_name: db_subnet_group_name.clone(),
1367 db_subnet_group_arn,
1368 db_subnet_group_description,
1369 vpc_id,
1370 subnet_ids,
1371 subnet_availability_zones,
1372 tags,
1373 };
1374
1375 state
1376 .subnet_groups
1377 .insert(db_subnet_group_name, subnet_group.clone());
1378
1379 Ok(AwsResponse::xml(
1380 StatusCode::OK,
1381 xml_wrap(
1382 "CreateDBSubnetGroup",
1383 &format!(
1384 "<DBSubnetGroup>{}</DBSubnetGroup>",
1385 db_subnet_group_xml(&subnet_group)
1386 ),
1387 &request.request_id,
1388 ),
1389 ))
1390 }
1391
1392 fn describe_db_subnet_groups(
1393 &self,
1394 request: &AwsRequest,
1395 ) -> Result<AwsResponse, AwsServiceError> {
1396 let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1397 let marker = optional_param(request, "Marker");
1398 let max_records = optional_param(request, "MaxRecords");
1399
1400 let state = self.state.read();
1401
1402 if let Some(name) = db_subnet_group_name {
1404 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1405 AwsServiceError::aws_error(
1406 StatusCode::NOT_FOUND,
1407 "DBSubnetGroupNotFoundFault",
1408 format!("DBSubnetGroup {} not found.", name),
1409 )
1410 })?;
1411
1412 return Ok(AwsResponse::xml(
1413 StatusCode::OK,
1414 xml_wrap(
1415 "DescribeDBSubnetGroups",
1416 &format!(
1417 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1418 db_subnet_group_xml(sg)
1419 ),
1420 &request.request_id,
1421 ),
1422 ));
1423 }
1424
1425 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1427 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1428
1429 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1431 &sg.db_subnet_group_name
1432 })?;
1433
1434 let marker_xml = paginated
1435 .next_marker
1436 .as_ref()
1437 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1438 .unwrap_or_default();
1439
1440 let body = paginated
1441 .items
1442 .iter()
1443 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1444 .collect::<Vec<_>>()
1445 .join("");
1446
1447 Ok(AwsResponse::xml(
1448 StatusCode::OK,
1449 xml_wrap(
1450 "DescribeDBSubnetGroups",
1451 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1452 &request.request_id,
1453 ),
1454 ))
1455 }
1456
1457 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1458 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1459
1460 let mut state = self.state.write();
1461
1462 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1463 return Err(AwsServiceError::aws_error(
1464 StatusCode::NOT_FOUND,
1465 "DBSubnetGroupNotFoundFault",
1466 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1467 ));
1468 }
1469
1470 Ok(AwsResponse::xml(
1471 StatusCode::OK,
1472 xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1473 ))
1474 }
1475
1476 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1477 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1478 let subnet_ids = parse_subnet_ids(request)?;
1479
1480 if subnet_ids.is_empty() {
1481 return Err(AwsServiceError::aws_error(
1482 StatusCode::BAD_REQUEST,
1483 "InvalidParameterValue",
1484 "At least one subnet must be specified.",
1485 ));
1486 }
1487
1488 if subnet_ids.len() < 2 {
1489 return Err(AwsServiceError::aws_error(
1490 StatusCode::BAD_REQUEST,
1491 "DBSubnetGroupDoesNotCoverEnoughAZs",
1492 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1493 ));
1494 }
1495
1496 let mut state = self.state.write();
1497
1498 let region = state.region.clone();
1499
1500 let subnet_group = state
1501 .subnet_groups
1502 .get_mut(&db_subnet_group_name)
1503 .ok_or_else(|| {
1504 AwsServiceError::aws_error(
1505 StatusCode::NOT_FOUND,
1506 "DBSubnetGroupNotFoundFault",
1507 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1508 )
1509 })?;
1510
1511 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1512 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
1513 .collect();
1514
1515 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1517 if unique_azs.len() < 2 {
1518 return Err(AwsServiceError::aws_error(
1519 StatusCode::BAD_REQUEST,
1520 "DBSubnetGroupDoesNotCoverEnoughAZs",
1521 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1522 ));
1523 }
1524
1525 subnet_group.subnet_ids = subnet_ids;
1526 subnet_group.subnet_availability_zones = subnet_availability_zones;
1527
1528 let subnet_group_clone = subnet_group.clone();
1529
1530 Ok(AwsResponse::xml(
1531 StatusCode::OK,
1532 xml_wrap(
1533 "ModifyDBSubnetGroup",
1534 &format!(
1535 "<DBSubnetGroup>{}</DBSubnetGroup>",
1536 db_subnet_group_xml(&subnet_group_clone)
1537 ),
1538 &request.request_id,
1539 ),
1540 ))
1541 }
1542
1543 fn create_db_parameter_group(
1544 &self,
1545 request: &AwsRequest,
1546 ) -> Result<AwsResponse, AwsServiceError> {
1547 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1548 let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1549 let description = required_param(request, "Description")?;
1550
1551 let valid_families = [
1553 "postgres16",
1554 "postgres15",
1555 "postgres14",
1556 "postgres13",
1557 "mysql8.0",
1558 "mysql5.7",
1559 "mariadb10.11",
1560 "mariadb10.6",
1561 ];
1562
1563 if !valid_families.contains(&db_parameter_group_family.as_str()) {
1564 return Err(AwsServiceError::aws_error(
1565 StatusCode::BAD_REQUEST,
1566 "InvalidParameterValue",
1567 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1568 ));
1569 }
1570
1571 let mut state = self.state.write();
1572
1573 if state
1574 .parameter_groups
1575 .contains_key(&db_parameter_group_name)
1576 {
1577 return Err(AwsServiceError::aws_error(
1578 StatusCode::CONFLICT,
1579 "DBParameterGroupAlreadyExists",
1580 format!("DBParameterGroup {db_parameter_group_name} already exists."),
1581 ));
1582 }
1583
1584 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1585 let tags = parse_tags(request)?;
1586
1587 let parameter_group = DbParameterGroup {
1588 db_parameter_group_name: db_parameter_group_name.clone(),
1589 db_parameter_group_arn,
1590 db_parameter_group_family,
1591 description,
1592 parameters: std::collections::HashMap::new(),
1593 tags,
1594 };
1595
1596 state
1597 .parameter_groups
1598 .insert(db_parameter_group_name, parameter_group.clone());
1599
1600 Ok(AwsResponse::xml(
1601 StatusCode::OK,
1602 xml_wrap(
1603 "CreateDBParameterGroup",
1604 &format!(
1605 "<DBParameterGroup>{}</DBParameterGroup>",
1606 db_parameter_group_xml(¶meter_group)
1607 ),
1608 &request.request_id,
1609 ),
1610 ))
1611 }
1612
1613 fn describe_db_parameter_groups(
1614 &self,
1615 request: &AwsRequest,
1616 ) -> Result<AwsResponse, AwsServiceError> {
1617 let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1618 let marker = optional_param(request, "Marker");
1619 let max_records = optional_param(request, "MaxRecords");
1620
1621 let state = self.state.read();
1622
1623 if let Some(name) = db_parameter_group_name {
1625 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1626 AwsServiceError::aws_error(
1627 StatusCode::NOT_FOUND,
1628 "DBParameterGroupNotFound",
1629 format!("DBParameterGroup {} not found.", name),
1630 )
1631 })?;
1632
1633 return Ok(AwsResponse::xml(
1634 StatusCode::OK,
1635 xml_wrap(
1636 "DescribeDBParameterGroups",
1637 &format!(
1638 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1639 db_parameter_group_xml(pg)
1640 ),
1641 &request.request_id,
1642 ),
1643 ));
1644 }
1645
1646 let mut parameter_groups: Vec<DbParameterGroup> =
1648 state.parameter_groups.values().cloned().collect();
1649 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1650
1651 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1653 &pg.db_parameter_group_name
1654 })?;
1655
1656 let marker_xml = paginated
1657 .next_marker
1658 .as_ref()
1659 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1660 .unwrap_or_default();
1661
1662 let body = paginated
1663 .items
1664 .iter()
1665 .map(|pg| {
1666 format!(
1667 "<DBParameterGroup>{}</DBParameterGroup>",
1668 db_parameter_group_xml(pg)
1669 )
1670 })
1671 .collect::<Vec<_>>()
1672 .join("");
1673
1674 Ok(AwsResponse::xml(
1675 StatusCode::OK,
1676 xml_wrap(
1677 "DescribeDBParameterGroups",
1678 &format!(
1679 "<DBParameterGroups>{}</DBParameterGroups>{}",
1680 body, marker_xml
1681 ),
1682 &request.request_id,
1683 ),
1684 ))
1685 }
1686
1687 fn delete_db_parameter_group(
1688 &self,
1689 request: &AwsRequest,
1690 ) -> Result<AwsResponse, AwsServiceError> {
1691 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1692
1693 let mut state = self.state.write();
1694
1695 if db_parameter_group_name.starts_with("default.") {
1696 return Err(AwsServiceError::aws_error(
1697 StatusCode::BAD_REQUEST,
1698 "InvalidParameterValue",
1699 "Cannot delete default parameter groups.",
1700 ));
1701 }
1702
1703 if state
1704 .parameter_groups
1705 .remove(&db_parameter_group_name)
1706 .is_none()
1707 {
1708 return Err(AwsServiceError::aws_error(
1709 StatusCode::NOT_FOUND,
1710 "DBParameterGroupNotFound",
1711 format!("DBParameterGroup {db_parameter_group_name} not found."),
1712 ));
1713 }
1714
1715 Ok(AwsResponse::xml(
1716 StatusCode::OK,
1717 xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1718 ))
1719 }
1720
1721 fn modify_db_parameter_group(
1722 &self,
1723 request: &AwsRequest,
1724 ) -> Result<AwsResponse, AwsServiceError> {
1725 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1726
1727 let mut state = self.state.write();
1728
1729 let parameter_group = state
1730 .parameter_groups
1731 .get_mut(&db_parameter_group_name)
1732 .ok_or_else(|| {
1733 AwsServiceError::aws_error(
1734 StatusCode::NOT_FOUND,
1735 "DBParameterGroupNotFound",
1736 format!("DBParameterGroup {db_parameter_group_name} not found."),
1737 )
1738 })?;
1739
1740 if let Some(new_description) = optional_param(request, "Description") {
1741 parameter_group.description = new_description;
1742 }
1743
1744 let parameter_group_clone = parameter_group.clone();
1745
1746 Ok(AwsResponse::xml(
1747 StatusCode::OK,
1748 xml_wrap(
1749 "ModifyDBParameterGroup",
1750 &format!(
1751 "<DBParameterGroupName>{}</DBParameterGroupName>",
1752 xml_escape(¶meter_group_clone.db_parameter_group_name)
1753 ),
1754 &request.request_id,
1755 ),
1756 ))
1757 }
1758}
1759
1760fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1761 req.query_params
1762 .get(name)
1763 .cloned()
1764 .filter(|value| !value.is_empty())
1765}
1766
1767fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1768 optional_param(req, name).ok_or_else(|| {
1769 AwsServiceError::aws_error(
1770 StatusCode::BAD_REQUEST,
1771 "MissingParameter",
1772 format!("The request must contain the parameter {name}."),
1773 )
1774 })
1775}
1776
1777fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1778 let value = required_param(req, name)?;
1779 value.parse::<i32>().map_err(|_| {
1780 AwsServiceError::aws_error(
1781 StatusCode::BAD_REQUEST,
1782 "InvalidParameterValue",
1783 format!("Parameter {name} must be a valid integer."),
1784 )
1785 })
1786}
1787
1788fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1789 optional_param(req, name)
1790 .map(|value| {
1791 value.parse::<i32>().map_err(|_| {
1792 AwsServiceError::aws_error(
1793 StatusCode::BAD_REQUEST,
1794 "InvalidParameterValue",
1795 format!("Parameter {name} must be a valid integer."),
1796 )
1797 })
1798 })
1799 .transpose()
1800}
1801
1802fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1803 let mut tags = Vec::new();
1804 for index in 1.. {
1805 let key_name = format!("Tags.Tag.{index}.Key");
1806 let value_name = format!("Tags.Tag.{index}.Value");
1807 let key = optional_param(req, &key_name);
1808 let value = optional_param(req, &value_name);
1809
1810 match (key, value) {
1811 (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1812 (None, None) => break,
1813 _ => {
1814 return Err(AwsServiceError::aws_error(
1815 StatusCode::BAD_REQUEST,
1816 "InvalidParameterValue",
1817 "Each tag must include both Key and Value.",
1818 ));
1819 }
1820 }
1821 }
1822
1823 Ok(tags)
1824}
1825
1826fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1827 let mut keys = Vec::new();
1828 for index in 1.. {
1829 let key_name = format!("TagKeys.member.{index}");
1830 match optional_param(req, &key_name) {
1831 Some(key) => keys.push(key),
1832 None => break,
1833 }
1834 }
1835
1836 Ok(keys)
1837}
1838
1839fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1840 let mut subnet_ids = Vec::new();
1841 for index in 1.. {
1842 let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1843 match optional_param(req, &subnet_id_name) {
1844 Some(subnet_id) => subnet_ids.push(subnet_id),
1845 None => break,
1846 }
1847 }
1848
1849 Ok(subnet_ids)
1850}
1851
1852fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1853 let mut security_group_ids = Vec::new();
1854 for index in 1.. {
1855 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1856 match optional_param(req, &sg_id_name) {
1857 Some(sg_id) => security_group_ids.push(sg_id),
1858 None => break,
1859 }
1860 }
1861
1862 if security_group_ids.is_empty() {
1864 security_group_ids.push("sg-default".to_string());
1865 }
1866
1867 security_group_ids
1868}
1869
1870fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1871 req.query_params.keys().any(|key| key.starts_with(prefix))
1872}
1873
1874fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1875 value
1876 .map(|raw| match raw {
1877 "true" | "True" | "TRUE" => Ok(true),
1878 "false" | "False" | "FALSE" => Ok(false),
1879 _ => Err(AwsServiceError::aws_error(
1880 StatusCode::BAD_REQUEST,
1881 "InvalidParameterValue",
1882 format!("Boolean parameter value '{raw}' is invalid."),
1883 )),
1884 })
1885 .transpose()
1886}
1887
1888struct PaginationResult<T> {
1889 items: Vec<T>,
1890 next_marker: Option<String>,
1891}
1892
1893fn paginate<T, F>(
1894 mut items: Vec<T>,
1895 marker: Option<String>,
1896 max_records: Option<String>,
1897 get_id: F,
1898) -> Result<PaginationResult<T>, AwsServiceError>
1899where
1900 F: Fn(&T) -> &str,
1901{
1902 let max = if let Some(max_str) = max_records {
1904 let parsed = max_str.parse::<i32>().map_err(|_| {
1905 AwsServiceError::aws_error(
1906 StatusCode::BAD_REQUEST,
1907 "InvalidParameterValue",
1908 "MaxRecords must be a valid integer.",
1909 )
1910 })?;
1911 if !(1..=100).contains(&parsed) {
1912 return Err(AwsServiceError::aws_error(
1913 StatusCode::BAD_REQUEST,
1914 "InvalidParameterValue",
1915 "MaxRecords must be between 1 and 100.",
1916 ));
1917 }
1918 parsed as usize
1919 } else {
1920 100
1921 };
1922
1923 let start_id = if let Some(encoded_marker) = marker {
1925 let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
1926 AwsServiceError::aws_error(
1927 StatusCode::BAD_REQUEST,
1928 "InvalidParameterValue",
1929 "Marker is invalid.",
1930 )
1931 })?;
1932 let id = String::from_utf8(decoded).map_err(|_| {
1933 AwsServiceError::aws_error(
1934 StatusCode::BAD_REQUEST,
1935 "InvalidParameterValue",
1936 "Marker is invalid.",
1937 )
1938 })?;
1939 Some(id)
1940 } else {
1941 None
1942 };
1943
1944 let start_index = if let Some(ref start_id) = start_id {
1946 items
1947 .iter()
1948 .position(|item| get_id(item) == start_id)
1949 .map(|pos| pos + 1) .unwrap_or(items.len()) } else {
1952 0
1953 };
1954
1955 let total_items = items.len();
1957 let end_index = std::cmp::min(start_index + max, total_items);
1958 let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
1959
1960 let next_marker = if end_index < total_items {
1962 paginated_items
1963 .last()
1964 .map(|item| BASE64.encode(get_id(item).as_bytes()))
1965 } else {
1966 None
1967 };
1968
1969 Ok(PaginationResult {
1970 items: paginated_items,
1971 next_marker,
1972 })
1973}
1974
1975fn validate_create_request(
1976 db_instance_identifier: &str,
1977 allocated_storage: i32,
1978 db_instance_class: &str,
1979 engine: &str,
1980 engine_version: &str,
1981 port: i32,
1982) -> Result<(), AwsServiceError> {
1983 if allocated_storage <= 0 {
1984 return Err(AwsServiceError::aws_error(
1985 StatusCode::BAD_REQUEST,
1986 "InvalidParameterValue",
1987 "AllocatedStorage must be greater than zero.",
1988 ));
1989 }
1990 if port <= 0 {
1991 return Err(AwsServiceError::aws_error(
1992 StatusCode::BAD_REQUEST,
1993 "InvalidParameterValue",
1994 "Port must be greater than zero.",
1995 ));
1996 }
1997 if !db_instance_identifier
1998 .chars()
1999 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2000 {
2001 return Err(AwsServiceError::aws_error(
2002 StatusCode::BAD_REQUEST,
2003 "InvalidParameterValue",
2004 "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2005 ));
2006 }
2007 let supported_engines = ["postgres", "mysql", "mariadb"];
2009 if !supported_engines.contains(&engine) {
2010 return Err(AwsServiceError::aws_error(
2011 StatusCode::BAD_REQUEST,
2012 "InvalidParameterValue",
2013 format!("Engine '{}' is not supported.", engine),
2014 ));
2015 }
2016
2017 let supported_versions = match engine {
2019 "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2020 "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2021 "mariadb" => vec!["10.11.6", "10.6.16"],
2022 _ => vec![],
2023 };
2024
2025 if !supported_versions.contains(&engine_version) {
2026 return Err(AwsServiceError::aws_error(
2027 StatusCode::BAD_REQUEST,
2028 "InvalidParameterValue",
2029 format!("EngineVersion '{engine_version}' is not supported yet."),
2030 ));
2031 }
2032 validate_db_instance_class(db_instance_class)?;
2033 Ok(())
2034}
2035
2036fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2037 let supported_classes = [
2038 "db.t3.micro",
2039 "db.t3.small",
2040 "db.t3.medium",
2041 "db.t3.large",
2042 "db.t4g.micro",
2043 "db.t4g.small",
2044 "db.m5.large",
2045 ];
2046 if !supported_classes.contains(&db_instance_class) {
2047 return Err(AwsServiceError::aws_error(
2048 StatusCode::BAD_REQUEST,
2049 "InvalidParameterValue",
2050 format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2051 ));
2052 }
2053 Ok(())
2054}
2055
2056fn filter_engine_versions(
2057 versions: &[EngineVersionInfo],
2058 engine: &Option<String>,
2059 engine_version: &Option<String>,
2060 family: &Option<String>,
2061) -> Vec<EngineVersionInfo> {
2062 versions
2063 .iter()
2064 .filter(|candidate| {
2065 engine
2066 .as_ref()
2067 .is_none_or(|expected| candidate.engine == *expected)
2068 })
2069 .filter(|candidate| {
2070 engine_version
2071 .as_ref()
2072 .is_none_or(|expected| candidate.engine_version == *expected)
2073 })
2074 .filter(|candidate| {
2075 family
2076 .as_ref()
2077 .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2078 })
2079 .cloned()
2080 .collect()
2081}
2082
2083fn filter_orderable_options(
2084 options: &[OrderableDbInstanceOption],
2085 engine: &Option<String>,
2086 engine_version: &Option<String>,
2087 db_instance_class: &Option<String>,
2088 license_model: &Option<String>,
2089 vpc: Option<bool>,
2090) -> Vec<OrderableDbInstanceOption> {
2091 options
2092 .iter()
2093 .filter(|candidate| {
2094 engine
2095 .as_ref()
2096 .is_none_or(|expected| candidate.engine == *expected)
2097 })
2098 .filter(|candidate| {
2099 engine_version
2100 .as_ref()
2101 .is_none_or(|expected| candidate.engine_version == *expected)
2102 })
2103 .filter(|candidate| {
2104 db_instance_class
2105 .as_ref()
2106 .is_none_or(|expected| candidate.db_instance_class == *expected)
2107 })
2108 .filter(|candidate| {
2109 license_model
2110 .as_ref()
2111 .is_none_or(|expected| candidate.license_model == *expected)
2112 })
2113 .filter(|_| vpc.unwrap_or(true))
2114 .cloned()
2115 .collect()
2116}
2117
2118fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2119 format!(
2120 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
2121 <{action}Response xmlns=\"{RDS_NS}\">\
2122 <{action}Result>{inner}</{action}Result>\
2123 <ResponseMetadata><RequestId>{request_id}</RequestId></ResponseMetadata>\
2124 </{action}Response>"
2125 )
2126}
2127
2128fn engine_version_xml(version: &EngineVersionInfo) -> String {
2129 format!(
2130 "<DBEngineVersion>\
2131 <Engine>{}</Engine>\
2132 <EngineVersion>{}</EngineVersion>\
2133 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2134 <DBEngineDescription>{}</DBEngineDescription>\
2135 <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2136 <Status>{}</Status>\
2137 </DBEngineVersion>",
2138 xml_escape(&version.engine),
2139 xml_escape(&version.engine_version),
2140 xml_escape(&version.db_parameter_group_family),
2141 xml_escape(&version.db_engine_description),
2142 xml_escape(&version.db_engine_version_description),
2143 xml_escape(&version.status),
2144 )
2145}
2146
2147fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2148 format!(
2149 "<OrderableDBInstanceOption>\
2150 <Engine>{}</Engine>\
2151 <EngineVersion>{}</EngineVersion>\
2152 <DBInstanceClass>{}</DBInstanceClass>\
2153 <LicenseModel>{}</LicenseModel>\
2154 <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2155 <MultiAZCapable>true</MultiAZCapable>\
2156 <ReadReplicaCapable>true</ReadReplicaCapable>\
2157 <Vpc>true</Vpc>\
2158 <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2159 <StorageType>{}</StorageType>\
2160 <SupportsIops>false</SupportsIops>\
2161 <MinStorageSize>{}</MinStorageSize>\
2162 <MaxStorageSize>{}</MaxStorageSize>\
2163 <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2164 </OrderableDBInstanceOption>",
2165 xml_escape(&option.engine),
2166 xml_escape(&option.engine_version),
2167 xml_escape(&option.db_instance_class),
2168 xml_escape(&option.license_model),
2169 xml_escape(&option.storage_type),
2170 option.min_storage_size,
2171 option.max_storage_size,
2172 )
2173}
2174
2175fn tag_xml(tag: &RdsTag) -> String {
2176 format!(
2177 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2178 xml_escape(&tag.key),
2179 xml_escape(&tag.value),
2180 )
2181}
2182
2183fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2184 let status = status_override.unwrap_or(&instance.db_instance_status);
2185 let db_name_xml = instance
2186 .db_name
2187 .as_ref()
2188 .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2189 .unwrap_or_default();
2190
2191 let read_replica_source_xml = instance
2192 .read_replica_source_db_instance_identifier
2193 .as_ref()
2194 .map(|source| {
2195 format!(
2196 "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2197 xml_escape(source)
2198 )
2199 })
2200 .unwrap_or_default();
2201
2202 let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2203 "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2204 } else {
2205 format!(
2206 "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2207 instance
2208 .read_replica_db_instance_identifiers
2209 .iter()
2210 .map(|id| format!(
2211 "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2212 xml_escape(id)
2213 ))
2214 .collect::<String>()
2215 )
2216 };
2217
2218 let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2219 "<VpcSecurityGroups/>".to_string()
2220 } else {
2221 format!(
2222 "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2223 instance
2224 .vpc_security_group_ids
2225 .iter()
2226 .map(|sg_id| format!(
2227 "<VpcSecurityGroupMembership>\
2228 <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2229 <Status>active</Status>\
2230 </VpcSecurityGroupMembership>",
2231 xml_escape(sg_id)
2232 ))
2233 .collect::<String>()
2234 )
2235 };
2236
2237 let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2238 Some(pg_name) => format!(
2239 "<DBParameterGroups>\
2240 <DBParameterGroup>\
2241 <DBParameterGroupName>{}</DBParameterGroupName>\
2242 <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2243 </DBParameterGroup>\
2244 </DBParameterGroups>",
2245 xml_escape(pg_name)
2246 ),
2247 None => "<DBParameterGroups/>".to_string(),
2248 };
2249
2250 let option_group_memberships_xml = match &instance.option_group_name {
2251 Some(og_name) => format!(
2252 "<OptionGroupMemberships>\
2253 <OptionGroupMembership>\
2254 <OptionGroupName>{}</OptionGroupName>\
2255 <Status>in-sync</Status>\
2256 </OptionGroupMembership>\
2257 </OptionGroupMemberships>",
2258 xml_escape(og_name)
2259 ),
2260 None => "<OptionGroupMemberships/>".to_string(),
2261 };
2262
2263 let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2264 let mut fields = Vec::new();
2265 if let Some(ref class) = pending.db_instance_class {
2266 fields.push(format!(
2267 "<DBInstanceClass>{}</DBInstanceClass>",
2268 xml_escape(class)
2269 ));
2270 }
2271 if let Some(allocated_storage) = pending.allocated_storage {
2272 fields.push(format!(
2273 "<AllocatedStorage>{}</AllocatedStorage>",
2274 allocated_storage
2275 ));
2276 }
2277 if let Some(backup_retention_period) = pending.backup_retention_period {
2278 fields.push(format!(
2279 "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2280 backup_retention_period
2281 ));
2282 }
2283 if let Some(multi_az) = pending.multi_az {
2284 fields.push(format!(
2285 "<MultiAZ>{}</MultiAZ>",
2286 if multi_az { "true" } else { "false" }
2287 ));
2288 }
2289 if let Some(ref engine_version) = pending.engine_version {
2290 fields.push(format!(
2291 "<EngineVersion>{}</EngineVersion>",
2292 xml_escape(engine_version)
2293 ));
2294 }
2295 if pending.master_user_password.is_some() {
2296 fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2297 }
2298 if !fields.is_empty() {
2299 format!(
2300 "<PendingModifiedValues>{}</PendingModifiedValues>",
2301 fields.join("")
2302 )
2303 } else {
2304 String::new()
2305 }
2306 } else {
2307 String::new()
2308 };
2309
2310 format!(
2311 "<DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2312 <DBInstanceClass>{}</DBInstanceClass>\
2313 <Engine>{}</Engine>\
2314 <DBInstanceStatus>{}</DBInstanceStatus>\
2315 <MasterUsername>{}</MasterUsername>\
2316 {}\
2317 <Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>\
2318 <AllocatedStorage>{}</AllocatedStorage>\
2319 <InstanceCreateTime>{}</InstanceCreateTime>\
2320 <PreferredBackupWindow>{}</PreferredBackupWindow>\
2321 <BackupRetentionPeriod>{}</BackupRetentionPeriod>\
2322 <DBSecurityGroups/>\
2323 {}\
2324 {}\
2325 <AvailabilityZone>us-east-1a</AvailabilityZone>\
2326 <LatestRestorableTime>{}</LatestRestorableTime>\
2327 <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2328 <MultiAZ>{}</MultiAZ>\
2329 <EngineVersion>{}</EngineVersion>\
2330 <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2331 {}\
2332 {}\
2333 <LicenseModel>postgresql-license</LicenseModel>\
2334 {}\
2335 <PubliclyAccessible>{}</PubliclyAccessible>\
2336 <StorageType>gp2</StorageType>\
2337 <DbInstancePort>{}</DbInstancePort>\
2338 <StorageEncrypted>false</StorageEncrypted>\
2339 <DbiResourceId>{}</DbiResourceId>\
2340 <DeletionProtection>{}</DeletionProtection>\
2341 {}\
2342 <DBInstanceArn>{}</DBInstanceArn>",
2343 xml_escape(&instance.db_instance_identifier),
2344 xml_escape(&instance.db_instance_class),
2345 xml_escape(&instance.engine),
2346 xml_escape(status),
2347 xml_escape(&instance.master_username),
2348 db_name_xml,
2349 xml_escape(&instance.endpoint_address),
2350 instance.port,
2351 instance.allocated_storage,
2352 instance.created_at.to_rfc3339(),
2353 xml_escape(&instance.preferred_backup_window),
2354 instance.backup_retention_period,
2355 vpc_security_groups_xml,
2356 db_parameter_groups_xml,
2357 instance.latest_restorable_time.to_rfc3339(),
2358 if instance.multi_az { "true" } else { "false" },
2359 xml_escape(&instance.engine_version),
2360 read_replica_identifiers_xml,
2361 read_replica_source_xml,
2362 option_group_memberships_xml,
2363 if instance.publicly_accessible {
2364 "true"
2365 } else {
2366 "false"
2367 },
2368 instance.port,
2369 xml_escape(&instance.dbi_resource_id),
2370 if instance.deletion_protection {
2371 "true"
2372 } else {
2373 "false"
2374 },
2375 pending_modified_values_xml,
2376 xml_escape(&instance.db_instance_arn),
2377 )
2378}
2379
2380fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2381 format!(
2382 "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2383 <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2384 <SnapshotCreateTime>{}</SnapshotCreateTime>\
2385 <Engine>{}</Engine>\
2386 <EngineVersion>{}</EngineVersion>\
2387 <AllocatedStorage>{}</AllocatedStorage>\
2388 <Status>{}</Status>\
2389 <Port>{}</Port>\
2390 <MasterUsername>{}</MasterUsername>\
2391 {}\
2392 <DbiResourceId>{}</DbiResourceId>\
2393 <SnapshotType>{}</SnapshotType>\
2394 <DBSnapshotArn>{}</DBSnapshotArn>",
2395 xml_escape(&snapshot.db_snapshot_identifier),
2396 xml_escape(&snapshot.db_instance_identifier),
2397 snapshot.snapshot_create_time.to_rfc3339(),
2398 xml_escape(&snapshot.engine),
2399 xml_escape(&snapshot.engine_version),
2400 snapshot.allocated_storage,
2401 xml_escape(&snapshot.status),
2402 snapshot.port,
2403 xml_escape(&snapshot.master_username),
2404 snapshot
2405 .db_name
2406 .as_ref()
2407 .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2408 .unwrap_or_default(),
2409 xml_escape(&snapshot.dbi_resource_id),
2410 xml_escape(&snapshot.snapshot_type),
2411 xml_escape(&snapshot.db_snapshot_arn),
2412 )
2413}
2414
2415fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2416 let subnets_xml = subnet_group
2417 .subnet_ids
2418 .iter()
2419 .zip(&subnet_group.subnet_availability_zones)
2420 .map(|(subnet_id, az)| {
2421 format!(
2422 "<Subnet>\
2423 <SubnetIdentifier>{}</SubnetIdentifier>\
2424 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2425 <SubnetStatus>Active</SubnetStatus>\
2426 </Subnet>",
2427 xml_escape(subnet_id),
2428 xml_escape(az)
2429 )
2430 })
2431 .collect::<String>();
2432
2433 format!(
2434 "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2435 <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2436 <VpcId>{}</VpcId>\
2437 <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2438 <Subnets>{}</Subnets>\
2439 <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2440 xml_escape(&subnet_group.db_subnet_group_name),
2441 xml_escape(&subnet_group.db_subnet_group_description),
2442 xml_escape(&subnet_group.vpc_id),
2443 subnets_xml,
2444 xml_escape(&subnet_group.db_subnet_group_arn),
2445 )
2446}
2447
2448fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2449 format!(
2450 "<DBParameterGroupName>{}</DBParameterGroupName>\
2451 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2452 <Description>{}</Description>\
2453 <DBParameterGroupArn>{}</DBParameterGroupArn>",
2454 xml_escape(¶meter_group.db_parameter_group_name),
2455 xml_escape(¶meter_group.db_parameter_group_family),
2456 xml_escape(¶meter_group.description),
2457 xml_escape(¶meter_group.db_parameter_group_arn),
2458 )
2459}
2460
2461fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2462 AwsServiceError::aws_error(
2463 StatusCode::NOT_FOUND,
2464 "DBInstanceNotFound",
2465 format!("DBInstance {} not found.", identifier),
2466 )
2467}
2468
2469fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2470 AwsServiceError::aws_error(
2471 StatusCode::NOT_FOUND,
2472 "DBSnapshotNotFound",
2473 format!("DBSnapshot {} not found.", identifier),
2474 )
2475}
2476
2477fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2478 AwsServiceError::aws_error(
2479 StatusCode::NOT_FOUND,
2480 "DBInstanceNotFound",
2481 format!("DBInstance {resource_name} not found."),
2482 )
2483}
2484
2485fn find_instance_by_arn<'a>(
2486 state: &'a crate::state::RdsState,
2487 resource_name: &str,
2488) -> Result<&'a DbInstance, AwsServiceError> {
2489 state
2490 .instances
2491 .values()
2492 .find(|instance| instance.db_instance_arn == resource_name)
2493 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2494}
2495
2496fn find_instance_by_arn_mut<'a>(
2497 state: &'a mut crate::state::RdsState,
2498 resource_name: &str,
2499) -> Result<&'a mut DbInstance, AwsServiceError> {
2500 state
2501 .instances
2502 .values_mut()
2503 .find(|instance| instance.db_instance_arn == resource_name)
2504 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2505}
2506
2507fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2508 for tag in incoming {
2509 if let Some(existing_tag) = existing
2510 .iter_mut()
2511 .find(|candidate| candidate.key == tag.key)
2512 {
2513 existing_tag.value = tag.value.clone();
2514 } else {
2515 existing.push(tag.clone());
2516 }
2517 }
2518}
2519
2520fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2521 match error {
2522 RuntimeError::Unavailable => AwsServiceError::aws_error(
2523 StatusCode::SERVICE_UNAVAILABLE,
2524 "InvalidParameterValue",
2525 "Docker/Podman is required for RDS DB instances but is not available",
2526 ),
2527 RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2528 StatusCode::INTERNAL_SERVER_ERROR,
2529 "InternalFailure",
2530 message,
2531 ),
2532 }
2533}
2534
2535#[cfg(test)]
2536mod tests {
2537 use std::collections::HashMap;
2538 use std::sync::Arc;
2539
2540 use bytes::Bytes;
2541 use chrono::Utc;
2542 use http::{HeaderMap, Method};
2543 use parking_lot::RwLock;
2544 use uuid::Uuid;
2545
2546 use super::{
2547 db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2548 optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2549 };
2550 use crate::state::{
2551 default_engine_versions, default_orderable_options, DbInstance, RdsState, RdsTag,
2552 };
2553 use fakecloud_core::service::{AwsRequest, AwsService};
2554
2555 #[test]
2556 fn filter_engine_versions_matches_requested_engine() {
2557 let versions = default_engine_versions();
2558
2559 let filtered =
2560 filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2561
2562 assert_eq!(filtered.len(), 4); assert!(filtered.iter().all(|v| v.engine == "postgres"));
2564 }
2565
2566 #[test]
2567 fn filter_orderable_options_respects_instance_class() {
2568 let options = default_orderable_options();
2569
2570 let filtered = filter_orderable_options(
2571 &options,
2572 &Some("postgres".to_string()),
2573 &Some("16.3".to_string()),
2574 &Some("db.t3.micro".to_string()),
2575 &None,
2576 Some(true),
2577 );
2578
2579 assert_eq!(filtered.len(), 1);
2580 assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2581 }
2582
2583 #[test]
2584 fn validate_create_request_rejects_unsupported_engine() {
2585 let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2586 .expect_err("unsupported engine");
2587
2588 assert_eq!(error.code(), "InvalidParameterValue");
2589 }
2590
2591 #[test]
2592 fn optional_i32_param_rejects_invalid_integer() {
2593 let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2594
2595 let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2596
2597 assert_eq!(error.code(), "InvalidParameterValue");
2598 }
2599
2600 #[test]
2601 fn db_instance_xml_renders_endpoint_and_status() {
2602 let created_at = Utc::now();
2603 let instance = DbInstance {
2604 db_instance_identifier: "test-db".to_string(),
2605 db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2606 db_instance_class: "db.t3.micro".to_string(),
2607 engine: "postgres".to_string(),
2608 engine_version: "16.3".to_string(),
2609 db_instance_status: "available".to_string(),
2610 master_username: "admin".to_string(),
2611 db_name: Some("appdb".to_string()),
2612 endpoint_address: "127.0.0.1".to_string(),
2613 port: 15432,
2614 allocated_storage: 20,
2615 publicly_accessible: true,
2616 deletion_protection: false,
2617 created_at,
2618 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2619 master_user_password: "secret123".to_string(),
2620 container_id: "container".to_string(),
2621 host_port: 15432,
2622 tags: Vec::new(),
2623 read_replica_source_db_instance_identifier: None,
2624 read_replica_db_instance_identifiers: Vec::new(),
2625 vpc_security_group_ids: vec!["sg-12345678".to_string()],
2626 db_parameter_group_name: Some("default.postgres16".to_string()),
2627 backup_retention_period: 1,
2628 preferred_backup_window: "03:00-04:00".to_string(),
2629 latest_restorable_time: created_at,
2630 option_group_name: None,
2631 multi_az: false,
2632 pending_modified_values: None,
2633 };
2634
2635 let xml = db_instance_xml(&instance, Some("creating"));
2636
2637 assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2638 assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2639 assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2640 }
2641
2642 #[test]
2643 fn parse_tags_reads_rds_query_shape() {
2644 let request = request(
2645 "AddTagsToResource",
2646 &[
2647 ("Tags.Tag.1.Key", "env"),
2648 ("Tags.Tag.1.Value", "dev"),
2649 ("Tags.Tag.2.Key", "team"),
2650 ("Tags.Tag.2.Value", "core"),
2651 ],
2652 );
2653
2654 let tags = parse_tags(&request).expect("tags");
2655
2656 assert_eq!(
2657 tags,
2658 vec![
2659 RdsTag {
2660 key: "env".to_string(),
2661 value: "dev".to_string(),
2662 },
2663 RdsTag {
2664 key: "team".to_string(),
2665 value: "core".to_string(),
2666 }
2667 ]
2668 );
2669 }
2670
2671 #[test]
2672 fn parse_tag_keys_reads_member_shape() {
2673 let request = request(
2674 "RemoveTagsFromResource",
2675 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2676 );
2677
2678 let tag_keys = parse_tag_keys(&request).expect("tag keys");
2679
2680 assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2681 }
2682
2683 #[test]
2684 fn merge_tags_updates_existing_values() {
2685 let mut tags = vec![RdsTag {
2686 key: "env".to_string(),
2687 value: "dev".to_string(),
2688 }];
2689
2690 merge_tags(
2691 &mut tags,
2692 &[
2693 RdsTag {
2694 key: "env".to_string(),
2695 value: "prod".to_string(),
2696 },
2697 RdsTag {
2698 key: "team".to_string(),
2699 value: "core".to_string(),
2700 },
2701 ],
2702 );
2703
2704 assert_eq!(tags.len(), 2);
2705 assert_eq!(tags[0].value, "prod");
2706 assert_eq!(tags[1].key, "team");
2707 }
2708
2709 #[tokio::test]
2710 async fn describe_engine_versions_returns_xml_body() {
2711 let service = RdsService::new(Arc::new(RwLock::new(RdsState::new(
2712 "123456789012",
2713 "us-east-1",
2714 ))));
2715 let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2716
2717 let response = service.handle(request).await.expect("response");
2718 let body = String::from_utf8(response.body.to_vec()).expect("utf8");
2719
2720 assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2721 assert!(body.contains("<Engine>postgres</Engine>"));
2722 assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2723 }
2724
2725 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2726 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2727 for (key, value) in params {
2728 query_params.insert((*key).to_string(), (*value).to_string());
2729 }
2730
2731 AwsRequest {
2732 service: "rds".to_string(),
2733 action: action.to_string(),
2734 region: "us-east-1".to_string(),
2735 account_id: "123456789012".to_string(),
2736 request_id: "test-request-id".to_string(),
2737 headers: HeaderMap::new(),
2738 query_params,
2739 body: Bytes::new(),
2740 path_segments: vec![],
2741 raw_path: "/".to_string(),
2742 raw_query: String::new(),
2743 method: Method::POST,
2744 is_query_protocol: true,
2745 access_key_id: None,
2746 }
2747 }
2748}