Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8
9use fakecloud_aws::xml::xml_escape;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::runtime::{RdsRuntime, RuntimeError};
13use crate::state::{
14    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
15    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsTag, SharedRdsState,
16};
17
18const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "AddTagsToResource",
21    "CreateDBInstance",
22    "CreateDBInstanceReadReplica",
23    "CreateDBParameterGroup",
24    "CreateDBSnapshot",
25    "CreateDBSubnetGroup",
26    "DeleteDBInstance",
27    "DeleteDBParameterGroup",
28    "DeleteDBSnapshot",
29    "DeleteDBSubnetGroup",
30    "DescribeDBEngineVersions",
31    "DescribeDBInstances",
32    "DescribeDBParameterGroups",
33    "DescribeDBSnapshots",
34    "DescribeDBSubnetGroups",
35    "DescribeOrderableDBInstanceOptions",
36    "ListTagsForResource",
37    "ModifyDBInstance",
38    "ModifyDBParameterGroup",
39    "ModifyDBSubnetGroup",
40    "RebootDBInstance",
41    "RemoveTagsFromResource",
42    "RestoreDBInstanceFromDBSnapshot",
43];
44
45pub struct RdsService {
46    state: SharedRdsState,
47    runtime: Option<Arc<RdsRuntime>>,
48}
49
50impl RdsService {
51    pub fn new(state: SharedRdsState) -> Self {
52        Self {
53            state,
54            runtime: None,
55        }
56    }
57
58    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
59        self.runtime = Some(runtime);
60        self
61    }
62}
63
64#[async_trait]
65impl AwsService for RdsService {
66    fn service_name(&self) -> &str {
67        "rds"
68    }
69
70    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
71        match request.action.as_str() {
72            "AddTagsToResource" => self.add_tags_to_resource(&request),
73            "CreateDBInstance" => self.create_db_instance(&request).await,
74            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
75            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
76            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
77            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
78            "DeleteDBInstance" => self.delete_db_instance(&request).await,
79            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
80            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
81            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
82            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
83            "DescribeDBInstances" => self.describe_db_instances(&request),
84            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
85            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
86            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
87            "DescribeOrderableDBInstanceOptions" => {
88                self.describe_orderable_db_instance_options(&request)
89            }
90            "ListTagsForResource" => self.list_tags_for_resource(&request),
91            "ModifyDBInstance" => self.modify_db_instance(&request),
92            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
93            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
94            "RebootDBInstance" => self.reboot_db_instance(&request).await,
95            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
96            "RestoreDBInstanceFromDBSnapshot" => {
97                self.restore_db_instance_from_db_snapshot(&request).await
98            }
99            _ => Err(AwsServiceError::action_not_implemented(
100                self.service_name(),
101                &request.action,
102            )),
103        }
104    }
105
106    fn supported_actions(&self) -> &[&str] {
107        SUPPORTED_ACTIONS
108    }
109}
110
111impl RdsService {
112    async fn create_db_instance(
113        &self,
114        request: &AwsRequest,
115    ) -> Result<AwsResponse, AwsServiceError> {
116        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
117        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
118        let db_instance_class = required_param(request, "DBInstanceClass")?;
119        let engine = required_param(request, "Engine")?;
120        let master_username = required_param(request, "MasterUsername")?;
121        let master_user_password = required_param(request, "MasterUserPassword")?;
122        let db_name = optional_param(request, "DBName");
123        let engine_version =
124            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
125        let publicly_accessible =
126            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
127                .unwrap_or(true);
128        let deletion_protection =
129            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
130                .unwrap_or(false);
131        // Default port based on engine
132        let default_port = match engine.as_str() {
133            "postgres" => 5432,
134            "mysql" | "mariadb" => 3306,
135            _ => 5432,
136        };
137        let port = optional_i32_param(request, "Port")?.unwrap_or(default_port);
138        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
139
140        // Default parameter group based on engine and version
141        let default_param_group = match engine.as_str() {
142            "postgres" => {
143                let major = engine_version.split('.').next().unwrap_or("16");
144                format!("default.postgres{}", major)
145            }
146            "mysql" => {
147                let major = if engine_version.starts_with("5.7") {
148                    "5.7"
149                } else {
150                    "8.0"
151                };
152                format!("default.mysql{}", major)
153            }
154            "mariadb" => {
155                let major = if engine_version.starts_with("10.11") {
156                    "10.11"
157                } else {
158                    "10.6"
159                };
160                format!("default.mariadb{}", major)
161            }
162            _ => "default.postgres16".to_string(),
163        };
164        let db_parameter_group_name =
165            optional_param(request, "DBParameterGroupName").or(Some(default_param_group));
166
167        let backup_retention_period =
168            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
169        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
170            .unwrap_or_else(|| "03:00-04:00".to_string());
171        let option_group_name = optional_param(request, "OptionGroupName");
172        let multi_az =
173            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
174
175        validate_create_request(
176            &db_instance_identifier,
177            allocated_storage,
178            &db_instance_class,
179            &engine,
180            &engine_version,
181            port,
182        )?;
183
184        {
185            let mut state = self.state.write();
186            if !state.begin_instance_creation(&db_instance_identifier) {
187                return Err(AwsServiceError::aws_error(
188                    StatusCode::BAD_REQUEST,
189                    "DBInstanceAlreadyExists",
190                    format!("DBInstance {} already exists.", db_instance_identifier),
191                ));
192            }
193        }
194
195        let runtime = self.runtime.as_ref().ok_or_else(|| {
196            AwsServiceError::aws_error(
197                StatusCode::SERVICE_UNAVAILABLE,
198                "InvalidParameterValue",
199                "Docker/Podman is required for RDS DB instances but is not available",
200            )
201        })?;
202
203        // Default database name based on engine
204        let logical_db_name = db_name.clone().unwrap_or_else(|| match engine.as_str() {
205            "postgres" => "postgres".to_string(),
206            "mysql" | "mariadb" => "mysql".to_string(),
207            _ => "postgres".to_string(),
208        });
209        let running = runtime
210            .ensure_postgres(
211                &db_instance_identifier,
212                &engine,
213                &engine_version,
214                &master_username,
215                &master_user_password,
216                &logical_db_name,
217            )
218            .await
219            .map_err(|error| {
220                self.state
221                    .write()
222                    .cancel_instance_creation(&db_instance_identifier);
223                runtime_error_to_service_error(error)
224            })?;
225
226        let mut state = self.state.write();
227        let created_at = Utc::now();
228        let instance = DbInstance {
229            db_instance_identifier: db_instance_identifier.clone(),
230            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
231            db_instance_class: db_instance_class.clone(),
232            engine: engine.clone(),
233            engine_version: engine_version.clone(),
234            db_instance_status: "available".to_string(),
235            master_username: master_username.clone(),
236            db_name: db_name.clone(),
237            endpoint_address: "127.0.0.1".to_string(),
238            port: i32::from(running.host_port),
239            allocated_storage,
240            publicly_accessible,
241            deletion_protection,
242            created_at,
243            dbi_resource_id: state.next_dbi_resource_id(),
244            master_user_password,
245            container_id: running.container_id,
246            host_port: running.host_port,
247            tags: Vec::new(),
248            read_replica_source_db_instance_identifier: None,
249            read_replica_db_instance_identifiers: Vec::new(),
250            vpc_security_group_ids,
251            db_parameter_group_name,
252            backup_retention_period,
253            preferred_backup_window,
254            latest_restorable_time: created_at,
255            option_group_name,
256            multi_az,
257            pending_modified_values: None,
258        };
259        state.finish_instance_creation(instance.clone());
260
261        Ok(AwsResponse::xml(
262            StatusCode::OK,
263            xml_wrap(
264                "CreateDBInstance",
265                &format!(
266                    "<DBInstance>{}</DBInstance>",
267                    db_instance_xml(&instance, Some("creating"))
268                ),
269                &request.request_id,
270            ),
271        ))
272    }
273
274    async fn delete_db_instance(
275        &self,
276        request: &AwsRequest,
277    ) -> Result<AwsResponse, AwsServiceError> {
278        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
279        let skip_final_snapshot =
280            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
281                .unwrap_or(false);
282        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
283
284        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
285            return Err(AwsServiceError::aws_error(
286                StatusCode::BAD_REQUEST,
287                "InvalidParameterCombination",
288                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
289            ));
290        }
291        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
292            return Err(AwsServiceError::aws_error(
293                StatusCode::BAD_REQUEST,
294                "InvalidParameterCombination",
295                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
296            ));
297        }
298
299        // Check deletion protection BEFORE creating snapshot or making any changes
300        {
301            let state = self.state.read();
302            if let Some(instance) = state.instances.get(&db_instance_identifier) {
303                if instance.deletion_protection {
304                    return Err(AwsServiceError::aws_error(
305                        StatusCode::BAD_REQUEST,
306                        "InvalidDBInstanceState",
307                        format!(
308                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
309                            db_instance_identifier
310                        ),
311                    ));
312                }
313            } else {
314                return Err(db_instance_not_found(&db_instance_identifier));
315            }
316        }
317
318        // Create final snapshot if requested
319        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
320            let runtime = self.runtime.as_ref().ok_or_else(|| {
321                AwsServiceError::aws_error(
322                    StatusCode::SERVICE_UNAVAILABLE,
323                    "InvalidParameterValue",
324                    "Docker/Podman is required for RDS snapshots but is not available",
325                )
326            })?;
327
328            let (instance_for_snapshot, db_name) = {
329                let state = self.state.read();
330
331                if state.snapshots.contains_key(snapshot_id) {
332                    return Err(AwsServiceError::aws_error(
333                        StatusCode::CONFLICT,
334                        "DBSnapshotAlreadyExists",
335                        format!("DBSnapshot {snapshot_id} already exists."),
336                    ));
337                }
338
339                let instance = state
340                    .instances
341                    .get(&db_instance_identifier)
342                    .cloned()
343                    .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
344
345                let db_name = instance
346                    .db_name
347                    .as_deref()
348                    .unwrap_or("postgres")
349                    .to_string();
350
351                (instance, db_name)
352            };
353
354            let dump_data = runtime
355                .dump_database(
356                    &db_instance_identifier,
357                    &instance_for_snapshot.master_username,
358                    &db_name,
359                )
360                .await
361                .map_err(runtime_error_to_service_error)?;
362
363            let mut state = self.state.write();
364
365            if state.snapshots.contains_key(snapshot_id) {
366                return Err(AwsServiceError::aws_error(
367                    StatusCode::CONFLICT,
368                    "DBSnapshotAlreadyExists",
369                    format!("DBSnapshot {snapshot_id} already exists."),
370                ));
371            }
372
373            let snapshot_arn = state.db_snapshot_arn(snapshot_id);
374
375            let snapshot = DbSnapshot {
376                db_snapshot_identifier: snapshot_id.clone(),
377                db_snapshot_arn: snapshot_arn,
378                db_instance_identifier: db_instance_identifier.clone(),
379                snapshot_create_time: Utc::now(),
380                engine: instance_for_snapshot.engine.clone(),
381                engine_version: instance_for_snapshot.engine_version.clone(),
382                allocated_storage: instance_for_snapshot.allocated_storage,
383                status: "available".to_string(),
384                port: instance_for_snapshot.port,
385                master_username: instance_for_snapshot.master_username.clone(),
386                db_name: instance_for_snapshot.db_name.clone(),
387                dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
388                snapshot_type: "manual".to_string(),
389                master_user_password: instance_for_snapshot.master_user_password.clone(),
390                tags: Vec::new(),
391                dump_data,
392            };
393
394            state.snapshots.insert(snapshot_id.clone(), snapshot);
395        }
396
397        let instance = {
398            let mut state = self.state.write();
399            let instance = state
400                .instances
401                .remove(&db_instance_identifier)
402                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
403
404            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
405                if let Some(source) = state.instances.get_mut(source_id) {
406                    source
407                        .read_replica_db_instance_identifiers
408                        .retain(|id| id != &db_instance_identifier);
409                }
410            }
411
412            for replica_id in &instance.read_replica_db_instance_identifiers {
413                if let Some(replica) = state.instances.get_mut(replica_id) {
414                    replica.read_replica_source_db_instance_identifier = None;
415                }
416            }
417
418            instance
419        };
420
421        if let Some(runtime) = &self.runtime {
422            runtime.stop_container(&db_instance_identifier).await;
423        }
424
425        Ok(AwsResponse::xml(
426            StatusCode::OK,
427            xml_wrap(
428                "DeleteDBInstance",
429                &format!(
430                    "<DBInstance>{}</DBInstance>",
431                    db_instance_xml(&instance, Some("deleting"))
432                ),
433                &request.request_id,
434            ),
435        ))
436    }
437
438    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
439        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
440        let db_instance_class = optional_param(request, "DBInstanceClass");
441        let deletion_protection =
442            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
443        let apply_immediately =
444            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
445
446        // Parse VPC security group IDs - only if at least one is provided
447        let vpc_security_group_ids = {
448            let mut ids = Vec::new();
449            for index in 1.. {
450                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
451                match optional_param(request, &sg_id_name) {
452                    Some(sg_id) => ids.push(sg_id),
453                    None => break,
454                }
455            }
456            if ids.is_empty() {
457                None
458            } else {
459                Some(ids)
460            }
461        };
462
463        if db_instance_class.is_none()
464            && deletion_protection.is_none()
465            && vpc_security_group_ids.is_none()
466        {
467            return Err(AwsServiceError::aws_error(
468                StatusCode::BAD_REQUEST,
469                "InvalidParameterCombination",
470                "At least one supported mutable field must be provided.",
471            ));
472        }
473        if let Some(ref class) = db_instance_class {
474            validate_db_instance_class(class)?;
475        }
476
477        let mut state = self.state.write();
478        let instance = state
479            .instances
480            .get_mut(&db_instance_identifier)
481            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
482
483        // If ApplyImmediately is false, stage changes as pending
484        if apply_immediately == Some(false) {
485            let pending = instance
486                .pending_modified_values
487                .get_or_insert(Default::default());
488            if let Some(class) = db_instance_class {
489                pending.db_instance_class = Some(class);
490            }
491            // Note: deletion_protection and vpc_security_group_ids are applied immediately
492            // regardless of ApplyImmediately flag (per AWS behavior)
493            if let Some(deletion_protection) = deletion_protection {
494                instance.deletion_protection = deletion_protection;
495            }
496            if let Some(security_group_ids) = vpc_security_group_ids {
497                instance.vpc_security_group_ids = security_group_ids;
498            }
499        } else {
500            // Apply immediately (default behavior)
501            if let Some(class) = db_instance_class {
502                instance.db_instance_class = class;
503            }
504            if let Some(deletion_protection) = deletion_protection {
505                instance.deletion_protection = deletion_protection;
506            }
507            if let Some(security_group_ids) = vpc_security_group_ids {
508                instance.vpc_security_group_ids = security_group_ids;
509            }
510        }
511
512        Ok(AwsResponse::xml(
513            StatusCode::OK,
514            xml_wrap(
515                "ModifyDBInstance",
516                &format!(
517                    "<DBInstance>{}</DBInstance>",
518                    db_instance_xml(instance, Some("modifying"))
519                ),
520                &request.request_id,
521            ),
522        ))
523    }
524
525    async fn reboot_db_instance(
526        &self,
527        request: &AwsRequest,
528    ) -> Result<AwsResponse, AwsServiceError> {
529        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
530        let force_failover =
531            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
532        if force_failover == Some(true) {
533            return Err(AwsServiceError::aws_error(
534                StatusCode::BAD_REQUEST,
535                "InvalidParameterCombination",
536                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
537            ));
538        }
539
540        let instance = {
541            let state = self.state.read();
542            state
543                .instances
544                .get(&db_instance_identifier)
545                .cloned()
546                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
547        };
548
549        let runtime = self.runtime.as_ref().ok_or_else(|| {
550            AwsServiceError::aws_error(
551                StatusCode::SERVICE_UNAVAILABLE,
552                "InvalidParameterValue",
553                "Docker/Podman is required for RDS DB instances but is not available",
554            )
555        })?;
556
557        let running = runtime
558            .restart_container(
559                &db_instance_identifier,
560                &instance.engine,
561                &instance.master_username,
562                &instance.master_user_password,
563                instance.db_name.as_deref().unwrap_or("postgres"),
564            )
565            .await
566            .map_err(runtime_error_to_service_error)?;
567
568        let instance = {
569            let mut state = self.state.write();
570            let instance = state
571                .instances
572                .get_mut(&db_instance_identifier)
573                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
574            instance.host_port = running.host_port;
575            instance.port = i32::from(running.host_port);
576
577            // Apply any pending modifications
578            if let Some(pending) = instance.pending_modified_values.take() {
579                if let Some(class) = pending.db_instance_class {
580                    instance.db_instance_class = class;
581                }
582                if let Some(allocated_storage) = pending.allocated_storage {
583                    instance.allocated_storage = allocated_storage;
584                }
585                if let Some(backup_retention_period) = pending.backup_retention_period {
586                    instance.backup_retention_period = backup_retention_period;
587                }
588                if let Some(multi_az) = pending.multi_az {
589                    instance.multi_az = multi_az;
590                }
591                if let Some(engine_version) = pending.engine_version {
592                    instance.engine_version = engine_version;
593                }
594                if let Some(master_user_password) = pending.master_user_password {
595                    instance.master_user_password = master_user_password;
596                }
597            }
598
599            instance.clone()
600        };
601
602        Ok(AwsResponse::xml(
603            StatusCode::OK,
604            xml_wrap(
605                "RebootDBInstance",
606                &format!(
607                    "<DBInstance>{}</DBInstance>",
608                    db_instance_xml(&instance, Some("rebooting"))
609                ),
610                &request.request_id,
611            ),
612        ))
613    }
614
615    fn describe_db_engine_versions(
616        &self,
617        request: &AwsRequest,
618    ) -> Result<AwsResponse, AwsServiceError> {
619        let engine = optional_param(request, "Engine");
620        let engine_version = optional_param(request, "EngineVersion");
621        let family = optional_param(request, "DBParameterGroupFamily");
622        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
623
624        let mut versions = filter_engine_versions(
625            &default_engine_versions(),
626            &engine,
627            &engine_version,
628            &family,
629        );
630
631        if default_only.unwrap_or(false) {
632            versions.truncate(1);
633        }
634
635        Ok(AwsResponse::xml(
636            StatusCode::OK,
637            xml_wrap(
638                "DescribeDBEngineVersions",
639                &format!(
640                    "<DBEngineVersions>{}</DBEngineVersions>",
641                    versions.iter().map(engine_version_xml).collect::<String>()
642                ),
643                &request.request_id,
644            ),
645        ))
646    }
647
648    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
649        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
650        let marker = optional_param(request, "Marker");
651        let max_records = optional_param(request, "MaxRecords");
652
653        let state = self.state.read();
654
655        // If specific identifier requested, return just that one (no pagination)
656        if let Some(identifier) = db_instance_identifier {
657            let instance = state
658                .instances
659                .get(&identifier)
660                .cloned()
661                .ok_or_else(|| db_instance_not_found(&identifier))?;
662
663            return Ok(AwsResponse::xml(
664                StatusCode::OK,
665                xml_wrap(
666                    "DescribeDBInstances",
667                    &format!(
668                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
669                        db_instance_xml(&instance, None)
670                    ),
671                    &request.request_id,
672                ),
673            ));
674        }
675
676        // Get all instances sorted by created_at, then identifier
677        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
678        instances.sort_by(|a, b| {
679            a.created_at
680                .cmp(&b.created_at)
681                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
682        });
683
684        // Apply pagination
685        let paginated = paginate(instances, marker, max_records, |inst| {
686            &inst.db_instance_identifier
687        })?;
688
689        let marker_xml = paginated
690            .next_marker
691            .as_ref()
692            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
693            .unwrap_or_default();
694
695        Ok(AwsResponse::xml(
696            StatusCode::OK,
697            xml_wrap(
698                "DescribeDBInstances",
699                &format!(
700                    "<DBInstances>{}</DBInstances>{}",
701                    paginated
702                        .items
703                        .iter()
704                        .map(|instance| {
705                            format!(
706                                "<DBInstance>{}</DBInstance>",
707                                db_instance_xml(instance, None)
708                            )
709                        })
710                        .collect::<String>(),
711                    marker_xml
712                ),
713                &request.request_id,
714            ),
715        ))
716    }
717
718    fn describe_orderable_db_instance_options(
719        &self,
720        request: &AwsRequest,
721    ) -> Result<AwsResponse, AwsServiceError> {
722        let engine = optional_param(request, "Engine");
723        let engine_version = optional_param(request, "EngineVersion");
724        let db_instance_class = optional_param(request, "DBInstanceClass");
725        let license_model = optional_param(request, "LicenseModel");
726        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
727
728        let options = filter_orderable_options(
729            &default_orderable_options(),
730            &engine,
731            &engine_version,
732            &db_instance_class,
733            &license_model,
734            vpc,
735        );
736
737        Ok(AwsResponse::xml(
738            StatusCode::OK,
739            xml_wrap(
740                "DescribeOrderableDBInstanceOptions",
741                &format!(
742                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
743                    options.iter().map(orderable_option_xml).collect::<String>()
744                ),
745                &request.request_id,
746            ),
747        ))
748    }
749
750    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
751        let resource_name = required_param(request, "ResourceName")?;
752        let tags = parse_tags(request)?;
753
754        if tags.is_empty() {
755            return Err(AwsServiceError::aws_error(
756                StatusCode::BAD_REQUEST,
757                "MissingParameter",
758                "The request must contain the parameter Tags.",
759            ));
760        }
761
762        let mut state = self.state.write();
763        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
764        merge_tags(&mut instance.tags, &tags);
765
766        Ok(AwsResponse::xml(
767            StatusCode::OK,
768            xml_wrap("AddTagsToResource", "", &request.request_id),
769        ))
770    }
771
772    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
773        let resource_name = required_param(request, "ResourceName")?;
774        if query_param_prefix_exists(request, "Filters.") {
775            return Err(AwsServiceError::aws_error(
776                StatusCode::BAD_REQUEST,
777                "InvalidParameterValue",
778                "Filters are not yet supported for ListTagsForResource.",
779            ));
780        }
781
782        let state = self.state.read();
783        let instance = find_instance_by_arn(&state, &resource_name)?;
784        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
785
786        Ok(AwsResponse::xml(
787            StatusCode::OK,
788            xml_wrap(
789                "ListTagsForResource",
790                &format!("<TagList>{tag_xml}</TagList>"),
791                &request.request_id,
792            ),
793        ))
794    }
795
796    fn remove_tags_from_resource(
797        &self,
798        request: &AwsRequest,
799    ) -> Result<AwsResponse, AwsServiceError> {
800        let resource_name = required_param(request, "ResourceName")?;
801        let tag_keys = parse_tag_keys(request)?;
802
803        if tag_keys.is_empty() {
804            return Err(AwsServiceError::aws_error(
805                StatusCode::BAD_REQUEST,
806                "MissingParameter",
807                "The request must contain the parameter TagKeys.",
808            ));
809        }
810
811        let mut state = self.state.write();
812        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
813        instance
814            .tags
815            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
816
817        Ok(AwsResponse::xml(
818            StatusCode::OK,
819            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
820        ))
821    }
822
823    async fn create_db_snapshot(
824        &self,
825        request: &AwsRequest,
826    ) -> Result<AwsResponse, AwsServiceError> {
827        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
828        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
829
830        let runtime = self.runtime.as_ref().ok_or_else(|| {
831            AwsServiceError::aws_error(
832                StatusCode::SERVICE_UNAVAILABLE,
833                "InvalidParameterValue",
834                "Docker/Podman is required for RDS snapshots but is not available",
835            )
836        })?;
837
838        let (instance, db_name) = {
839            let state = self.state.write();
840
841            if state.snapshots.contains_key(&db_snapshot_identifier) {
842                return Err(AwsServiceError::aws_error(
843                    StatusCode::CONFLICT,
844                    "DBSnapshotAlreadyExists",
845                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
846                ));
847            }
848
849            let instance = state
850                .instances
851                .get(&db_instance_identifier)
852                .cloned()
853                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
854
855            let db_name = instance
856                .db_name
857                .as_deref()
858                .unwrap_or("postgres")
859                .to_string();
860
861            (instance, db_name)
862        };
863
864        let dump_data = runtime
865            .dump_database(&db_instance_identifier, &instance.master_username, &db_name)
866            .await
867            .map_err(runtime_error_to_service_error)?;
868
869        let mut state = self.state.write();
870
871        if state.snapshots.contains_key(&db_snapshot_identifier) {
872            return Err(AwsServiceError::aws_error(
873                StatusCode::CONFLICT,
874                "DBSnapshotAlreadyExists",
875                format!("DBSnapshot {db_snapshot_identifier} already exists."),
876            ));
877        }
878
879        let snapshot = DbSnapshot {
880            db_snapshot_identifier: db_snapshot_identifier.clone(),
881            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
882            db_instance_identifier: instance.db_instance_identifier.clone(),
883            snapshot_create_time: Utc::now(),
884            engine: instance.engine.clone(),
885            engine_version: instance.engine_version.clone(),
886            allocated_storage: instance.allocated_storage,
887            status: "available".to_string(),
888            port: instance.port,
889            master_username: instance.master_username.clone(),
890            db_name: instance.db_name.clone(),
891            dbi_resource_id: instance.dbi_resource_id.clone(),
892            snapshot_type: "manual".to_string(),
893            master_user_password: instance.master_user_password.clone(),
894            tags: Vec::new(),
895            dump_data,
896        };
897
898        state
899            .snapshots
900            .insert(db_snapshot_identifier, snapshot.clone());
901
902        Ok(AwsResponse::xml(
903            StatusCode::OK,
904            xml_wrap(
905                "CreateDBSnapshot",
906                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
907                &request.request_id,
908            ),
909        ))
910    }
911
912    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
913        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
914        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
915        let marker = optional_param(request, "Marker");
916        let max_records = optional_param(request, "MaxRecords");
917
918        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
919            return Err(AwsServiceError::aws_error(
920                StatusCode::BAD_REQUEST,
921                "InvalidParameterCombination",
922                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
923            ));
924        }
925
926        let state = self.state.read();
927
928        // If specific snapshot requested, return just that one (no pagination)
929        if let Some(snapshot_id) = db_snapshot_identifier {
930            let snapshot = state
931                .snapshots
932                .get(&snapshot_id)
933                .cloned()
934                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
935
936            return Ok(AwsResponse::xml(
937                StatusCode::OK,
938                xml_wrap(
939                    "DescribeDBSnapshots",
940                    &format!(
941                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
942                        db_snapshot_xml(&snapshot)
943                    ),
944                    &request.request_id,
945                ),
946            ));
947        }
948
949        // Get snapshots, filtered by instance identifier if provided
950        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
951            state
952                .snapshots
953                .values()
954                .filter(|s| s.db_instance_identifier == instance_id)
955                .cloned()
956                .collect()
957        } else {
958            state.snapshots.values().cloned().collect()
959        };
960
961        // Sort by creation time, then identifier
962        snapshots.sort_by(|a, b| {
963            a.snapshot_create_time
964                .cmp(&b.snapshot_create_time)
965                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
966        });
967
968        // Apply pagination
969        let paginated = paginate(snapshots, marker, max_records, |snap| {
970            &snap.db_snapshot_identifier
971        })?;
972
973        let marker_xml = paginated
974            .next_marker
975            .as_ref()
976            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
977            .unwrap_or_default();
978
979        Ok(AwsResponse::xml(
980            StatusCode::OK,
981            xml_wrap(
982                "DescribeDBSnapshots",
983                &format!(
984                    "<DBSnapshots>{}</DBSnapshots>{}",
985                    paginated
986                        .items
987                        .iter()
988                        .map(|snapshot| format!(
989                            "<DBSnapshot>{}</DBSnapshot>",
990                            db_snapshot_xml(snapshot)
991                        ))
992                        .collect::<String>(),
993                    marker_xml
994                ),
995                &request.request_id,
996            ),
997        ))
998    }
999
1000    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1001        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1002
1003        let mut state = self.state.write();
1004
1005        let snapshot = state
1006            .snapshots
1007            .remove(&db_snapshot_identifier)
1008            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1009
1010        Ok(AwsResponse::xml(
1011            StatusCode::OK,
1012            xml_wrap(
1013                "DeleteDBSnapshot",
1014                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1015                &request.request_id,
1016            ),
1017        ))
1018    }
1019
1020    async fn restore_db_instance_from_db_snapshot(
1021        &self,
1022        request: &AwsRequest,
1023    ) -> Result<AwsResponse, AwsServiceError> {
1024        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1025        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1026        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1027
1028        let runtime = self.runtime.as_ref().ok_or_else(|| {
1029            AwsServiceError::aws_error(
1030                StatusCode::SERVICE_UNAVAILABLE,
1031                "InvalidParameterValue",
1032                "Docker/Podman is required for RDS DB instances but is not available",
1033            )
1034        })?;
1035
1036        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1037            let mut state = self.state.write();
1038
1039            if !state.begin_instance_creation(&db_instance_identifier) {
1040                return Err(AwsServiceError::aws_error(
1041                    StatusCode::CONFLICT,
1042                    "DBInstanceAlreadyExists",
1043                    format!("DBInstance {db_instance_identifier} already exists."),
1044                ));
1045            }
1046
1047            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1048                Some(s) => s,
1049                None => {
1050                    state.cancel_instance_creation(&db_instance_identifier);
1051                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1052                }
1053            };
1054
1055            let dbi_resource_id = state.next_dbi_resource_id();
1056            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1057            let created_at = Utc::now();
1058
1059            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1060        };
1061
1062        let db_name = snapshot.db_name.as_deref().unwrap_or("postgres");
1063        let running = match runtime
1064            .ensure_postgres(
1065                &db_instance_identifier,
1066                &snapshot.engine,
1067                &snapshot.engine_version,
1068                &snapshot.master_username,
1069                &snapshot.master_user_password,
1070                db_name,
1071            )
1072            .await
1073        {
1074            Ok(running) => running,
1075            Err(e) => {
1076                self.state
1077                    .write()
1078                    .cancel_instance_creation(&db_instance_identifier);
1079                return Err(runtime_error_to_service_error(e));
1080            }
1081        };
1082
1083        if let Err(e) = runtime
1084            .restore_database(
1085                &db_instance_identifier,
1086                &snapshot.master_username,
1087                db_name,
1088                &snapshot.dump_data,
1089            )
1090            .await
1091        {
1092            self.state
1093                .write()
1094                .cancel_instance_creation(&db_instance_identifier);
1095            runtime.stop_container(&db_instance_identifier).await;
1096            return Err(runtime_error_to_service_error(e));
1097        }
1098
1099        let mut state = self.state.write();
1100
1101        let instance = DbInstance {
1102            db_instance_identifier: db_instance_identifier.clone(),
1103            db_instance_arn,
1104            db_instance_class: "db.t3.micro".to_string(),
1105            engine: snapshot.engine.clone(),
1106            engine_version: snapshot.engine_version.clone(),
1107            db_instance_status: "available".to_string(),
1108            master_username: snapshot.master_username.clone(),
1109            db_name: snapshot.db_name.clone(),
1110            endpoint_address: "127.0.0.1".to_string(),
1111            port: i32::from(running.host_port),
1112            allocated_storage: snapshot.allocated_storage,
1113            publicly_accessible: true,
1114            deletion_protection: false,
1115            created_at,
1116            dbi_resource_id,
1117            master_user_password: snapshot.master_user_password.clone(),
1118            container_id: running.container_id,
1119            host_port: running.host_port,
1120            tags: Vec::new(),
1121            read_replica_source_db_instance_identifier: None,
1122            read_replica_db_instance_identifiers: Vec::new(),
1123            vpc_security_group_ids,
1124            db_parameter_group_name: None,
1125            backup_retention_period: 1,
1126            preferred_backup_window: "03:00-04:00".to_string(),
1127            latest_restorable_time: created_at,
1128            option_group_name: None,
1129            multi_az: false,
1130            pending_modified_values: None,
1131        };
1132
1133        state.finish_instance_creation(instance.clone());
1134
1135        Ok(AwsResponse::xml(
1136            StatusCode::OK,
1137            xml_wrap(
1138                "RestoreDBInstanceFromDBSnapshot",
1139                &format!(
1140                    "<DBInstance>{}</DBInstance>",
1141                    db_instance_xml(&instance, None)
1142                ),
1143                &request.request_id,
1144            ),
1145        ))
1146    }
1147
1148    async fn create_db_instance_read_replica(
1149        &self,
1150        request: &AwsRequest,
1151    ) -> Result<AwsResponse, AwsServiceError> {
1152        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1153        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1154
1155        let runtime = self.runtime.as_ref().ok_or_else(|| {
1156            AwsServiceError::aws_error(
1157                StatusCode::SERVICE_UNAVAILABLE,
1158                "InvalidParameterValue",
1159                "Docker/Podman is required for RDS read replicas but is not available",
1160            )
1161        })?;
1162
1163        let (source_instance, db_name) = {
1164            let mut state = self.state.write();
1165
1166            if !state.begin_instance_creation(&db_instance_identifier) {
1167                return Err(AwsServiceError::aws_error(
1168                    StatusCode::CONFLICT,
1169                    "DBInstanceAlreadyExists",
1170                    format!("DBInstance {db_instance_identifier} already exists."),
1171                ));
1172            }
1173
1174            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1175            {
1176                Some(inst) => inst,
1177                None => {
1178                    state.cancel_instance_creation(&db_instance_identifier);
1179                    return Err(db_instance_not_found(&source_db_instance_identifier));
1180                }
1181            };
1182
1183            let db_name = source_instance
1184                .db_name
1185                .as_deref()
1186                .unwrap_or("postgres")
1187                .to_string();
1188
1189            (source_instance, db_name)
1190        };
1191
1192        let dump_data = match runtime
1193            .dump_database(
1194                &source_db_instance_identifier,
1195                &source_instance.master_username,
1196                &db_name,
1197            )
1198            .await
1199        {
1200            Ok(data) => data,
1201            Err(e) => {
1202                self.state
1203                    .write()
1204                    .cancel_instance_creation(&db_instance_identifier);
1205                return Err(runtime_error_to_service_error(e));
1206            }
1207        };
1208
1209        let dbi_resource_id = self.state.read().next_dbi_resource_id();
1210        let db_instance_arn = self.state.read().db_instance_arn(&db_instance_identifier);
1211        let created_at = Utc::now();
1212
1213        let running = match runtime
1214            .ensure_postgres(
1215                &db_instance_identifier,
1216                &source_instance.engine,
1217                &source_instance.engine_version,
1218                &source_instance.master_username,
1219                &source_instance.master_user_password,
1220                &db_name,
1221            )
1222            .await
1223        {
1224            Ok(running) => running,
1225            Err(e) => {
1226                self.state
1227                    .write()
1228                    .cancel_instance_creation(&db_instance_identifier);
1229                return Err(runtime_error_to_service_error(e));
1230            }
1231        };
1232
1233        if let Err(e) = runtime
1234            .restore_database(
1235                &db_instance_identifier,
1236                &source_instance.master_username,
1237                &db_name,
1238                &dump_data,
1239            )
1240            .await
1241        {
1242            self.state
1243                .write()
1244                .cancel_instance_creation(&db_instance_identifier);
1245            runtime.stop_container(&db_instance_identifier).await;
1246            return Err(runtime_error_to_service_error(e));
1247        }
1248
1249        let replica = DbInstance {
1250            db_instance_identifier: db_instance_identifier.clone(),
1251            db_instance_arn,
1252            db_instance_class: source_instance.db_instance_class.clone(),
1253            engine: source_instance.engine.clone(),
1254            engine_version: source_instance.engine_version.clone(),
1255            db_instance_status: "available".to_string(),
1256            master_username: source_instance.master_username.clone(),
1257            db_name: source_instance.db_name.clone(),
1258            endpoint_address: "127.0.0.1".to_string(),
1259            port: i32::from(running.host_port),
1260            allocated_storage: source_instance.allocated_storage,
1261            publicly_accessible: source_instance.publicly_accessible,
1262            deletion_protection: false,
1263            created_at,
1264            dbi_resource_id,
1265            master_user_password: source_instance.master_user_password.clone(),
1266            container_id: running.container_id,
1267            host_port: running.host_port,
1268            tags: Vec::new(),
1269            read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.clone()),
1270            read_replica_db_instance_identifiers: Vec::new(),
1271            vpc_security_group_ids: source_instance.vpc_security_group_ids.clone(),
1272            db_parameter_group_name: source_instance.db_parameter_group_name.clone(),
1273            backup_retention_period: source_instance.backup_retention_period,
1274            preferred_backup_window: source_instance.preferred_backup_window.clone(),
1275            latest_restorable_time: created_at,
1276            option_group_name: source_instance.option_group_name.clone(),
1277            multi_az: source_instance.multi_az,
1278            pending_modified_values: None,
1279        };
1280
1281        let source_missing = {
1282            let mut state = self.state.write();
1283            match state.instances.get_mut(&source_db_instance_identifier) {
1284                Some(source) => {
1285                    source
1286                        .read_replica_db_instance_identifiers
1287                        .push(db_instance_identifier.clone());
1288                    state.finish_instance_creation(replica.clone());
1289                    false
1290                }
1291                None => {
1292                    state.cancel_instance_creation(&db_instance_identifier);
1293                    true
1294                }
1295            }
1296        };
1297
1298        if source_missing {
1299            runtime.stop_container(&db_instance_identifier).await;
1300            return Err(db_instance_not_found(&source_db_instance_identifier));
1301        }
1302
1303        Ok(AwsResponse::xml(
1304            StatusCode::OK,
1305            xml_wrap(
1306                "CreateDBInstanceReadReplica",
1307                &format!(
1308                    "<DBInstance>{}</DBInstance>",
1309                    db_instance_xml(&replica, None)
1310                ),
1311                &request.request_id,
1312            ),
1313        ))
1314    }
1315
1316    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1317        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1318        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1319        let subnet_ids = parse_subnet_ids(request)?;
1320
1321        if subnet_ids.is_empty() {
1322            return Err(AwsServiceError::aws_error(
1323                StatusCode::BAD_REQUEST,
1324                "InvalidParameterValue",
1325                "At least one subnet must be specified.",
1326            ));
1327        }
1328
1329        if subnet_ids.len() < 2 {
1330            return Err(AwsServiceError::aws_error(
1331                StatusCode::BAD_REQUEST,
1332                "DBSubnetGroupDoesNotCoverEnoughAZs",
1333                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1334            ));
1335        }
1336
1337        let mut state = self.state.write();
1338
1339        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1340            return Err(AwsServiceError::aws_error(
1341                StatusCode::CONFLICT,
1342                "DBSubnetGroupAlreadyExists",
1343                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1344            ));
1345        }
1346
1347        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1348        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1349            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1350            .collect();
1351
1352        // Validate that subnets span at least 2 unique Availability Zones
1353        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1354        if unique_azs.len() < 2 {
1355            return Err(AwsServiceError::aws_error(
1356                StatusCode::BAD_REQUEST,
1357                "DBSubnetGroupDoesNotCoverEnoughAZs",
1358                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1359            ));
1360        }
1361
1362        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1363        let tags = parse_tags(request)?;
1364
1365        let subnet_group = DbSubnetGroup {
1366            db_subnet_group_name: db_subnet_group_name.clone(),
1367            db_subnet_group_arn,
1368            db_subnet_group_description,
1369            vpc_id,
1370            subnet_ids,
1371            subnet_availability_zones,
1372            tags,
1373        };
1374
1375        state
1376            .subnet_groups
1377            .insert(db_subnet_group_name, subnet_group.clone());
1378
1379        Ok(AwsResponse::xml(
1380            StatusCode::OK,
1381            xml_wrap(
1382                "CreateDBSubnetGroup",
1383                &format!(
1384                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1385                    db_subnet_group_xml(&subnet_group)
1386                ),
1387                &request.request_id,
1388            ),
1389        ))
1390    }
1391
1392    fn describe_db_subnet_groups(
1393        &self,
1394        request: &AwsRequest,
1395    ) -> Result<AwsResponse, AwsServiceError> {
1396        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1397        let marker = optional_param(request, "Marker");
1398        let max_records = optional_param(request, "MaxRecords");
1399
1400        let state = self.state.read();
1401
1402        // If specific subnet group requested, return just that one (no pagination)
1403        if let Some(name) = db_subnet_group_name {
1404            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1405                AwsServiceError::aws_error(
1406                    StatusCode::NOT_FOUND,
1407                    "DBSubnetGroupNotFoundFault",
1408                    format!("DBSubnetGroup {} not found.", name),
1409                )
1410            })?;
1411
1412            return Ok(AwsResponse::xml(
1413                StatusCode::OK,
1414                xml_wrap(
1415                    "DescribeDBSubnetGroups",
1416                    &format!(
1417                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1418                        db_subnet_group_xml(sg)
1419                    ),
1420                    &request.request_id,
1421                ),
1422            ));
1423        }
1424
1425        // Get all subnet groups sorted by name
1426        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1427        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1428
1429        // Apply pagination
1430        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1431            &sg.db_subnet_group_name
1432        })?;
1433
1434        let marker_xml = paginated
1435            .next_marker
1436            .as_ref()
1437            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1438            .unwrap_or_default();
1439
1440        let body = paginated
1441            .items
1442            .iter()
1443            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1444            .collect::<Vec<_>>()
1445            .join("");
1446
1447        Ok(AwsResponse::xml(
1448            StatusCode::OK,
1449            xml_wrap(
1450                "DescribeDBSubnetGroups",
1451                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1452                &request.request_id,
1453            ),
1454        ))
1455    }
1456
1457    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1458        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1459
1460        let mut state = self.state.write();
1461
1462        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1463            return Err(AwsServiceError::aws_error(
1464                StatusCode::NOT_FOUND,
1465                "DBSubnetGroupNotFoundFault",
1466                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1467            ));
1468        }
1469
1470        Ok(AwsResponse::xml(
1471            StatusCode::OK,
1472            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1473        ))
1474    }
1475
1476    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1477        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1478        let subnet_ids = parse_subnet_ids(request)?;
1479
1480        if subnet_ids.is_empty() {
1481            return Err(AwsServiceError::aws_error(
1482                StatusCode::BAD_REQUEST,
1483                "InvalidParameterValue",
1484                "At least one subnet must be specified.",
1485            ));
1486        }
1487
1488        if subnet_ids.len() < 2 {
1489            return Err(AwsServiceError::aws_error(
1490                StatusCode::BAD_REQUEST,
1491                "DBSubnetGroupDoesNotCoverEnoughAZs",
1492                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1493            ));
1494        }
1495
1496        let mut state = self.state.write();
1497
1498        let region = state.region.clone();
1499
1500        let subnet_group = state
1501            .subnet_groups
1502            .get_mut(&db_subnet_group_name)
1503            .ok_or_else(|| {
1504                AwsServiceError::aws_error(
1505                    StatusCode::NOT_FOUND,
1506                    "DBSubnetGroupNotFoundFault",
1507                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1508                )
1509            })?;
1510
1511        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1512            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1513            .collect();
1514
1515        // Validate that subnets span at least 2 unique Availability Zones
1516        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1517        if unique_azs.len() < 2 {
1518            return Err(AwsServiceError::aws_error(
1519                StatusCode::BAD_REQUEST,
1520                "DBSubnetGroupDoesNotCoverEnoughAZs",
1521                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1522            ));
1523        }
1524
1525        subnet_group.subnet_ids = subnet_ids;
1526        subnet_group.subnet_availability_zones = subnet_availability_zones;
1527
1528        let subnet_group_clone = subnet_group.clone();
1529
1530        Ok(AwsResponse::xml(
1531            StatusCode::OK,
1532            xml_wrap(
1533                "ModifyDBSubnetGroup",
1534                &format!(
1535                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1536                    db_subnet_group_xml(&subnet_group_clone)
1537                ),
1538                &request.request_id,
1539            ),
1540        ))
1541    }
1542
1543    fn create_db_parameter_group(
1544        &self,
1545        request: &AwsRequest,
1546    ) -> Result<AwsResponse, AwsServiceError> {
1547        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1548        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1549        let description = required_param(request, "Description")?;
1550
1551        // Validate parameter group family against supported engines and versions
1552        let valid_families = [
1553            "postgres16",
1554            "postgres15",
1555            "postgres14",
1556            "postgres13",
1557            "mysql8.0",
1558            "mysql5.7",
1559            "mariadb10.11",
1560            "mariadb10.6",
1561        ];
1562
1563        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1564            return Err(AwsServiceError::aws_error(
1565                StatusCode::BAD_REQUEST,
1566                "InvalidParameterValue",
1567                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1568            ));
1569        }
1570
1571        let mut state = self.state.write();
1572
1573        if state
1574            .parameter_groups
1575            .contains_key(&db_parameter_group_name)
1576        {
1577            return Err(AwsServiceError::aws_error(
1578                StatusCode::CONFLICT,
1579                "DBParameterGroupAlreadyExists",
1580                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1581            ));
1582        }
1583
1584        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1585        let tags = parse_tags(request)?;
1586
1587        let parameter_group = DbParameterGroup {
1588            db_parameter_group_name: db_parameter_group_name.clone(),
1589            db_parameter_group_arn,
1590            db_parameter_group_family,
1591            description,
1592            parameters: std::collections::HashMap::new(),
1593            tags,
1594        };
1595
1596        state
1597            .parameter_groups
1598            .insert(db_parameter_group_name, parameter_group.clone());
1599
1600        Ok(AwsResponse::xml(
1601            StatusCode::OK,
1602            xml_wrap(
1603                "CreateDBParameterGroup",
1604                &format!(
1605                    "<DBParameterGroup>{}</DBParameterGroup>",
1606                    db_parameter_group_xml(&parameter_group)
1607                ),
1608                &request.request_id,
1609            ),
1610        ))
1611    }
1612
1613    fn describe_db_parameter_groups(
1614        &self,
1615        request: &AwsRequest,
1616    ) -> Result<AwsResponse, AwsServiceError> {
1617        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1618        let marker = optional_param(request, "Marker");
1619        let max_records = optional_param(request, "MaxRecords");
1620
1621        let state = self.state.read();
1622
1623        // If specific parameter group requested, return just that one (no pagination)
1624        if let Some(name) = db_parameter_group_name {
1625            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1626                AwsServiceError::aws_error(
1627                    StatusCode::NOT_FOUND,
1628                    "DBParameterGroupNotFound",
1629                    format!("DBParameterGroup {} not found.", name),
1630                )
1631            })?;
1632
1633            return Ok(AwsResponse::xml(
1634                StatusCode::OK,
1635                xml_wrap(
1636                    "DescribeDBParameterGroups",
1637                    &format!(
1638                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1639                        db_parameter_group_xml(pg)
1640                    ),
1641                    &request.request_id,
1642                ),
1643            ));
1644        }
1645
1646        // Get all parameter groups sorted by name
1647        let mut parameter_groups: Vec<DbParameterGroup> =
1648            state.parameter_groups.values().cloned().collect();
1649        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1650
1651        // Apply pagination
1652        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1653            &pg.db_parameter_group_name
1654        })?;
1655
1656        let marker_xml = paginated
1657            .next_marker
1658            .as_ref()
1659            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1660            .unwrap_or_default();
1661
1662        let body = paginated
1663            .items
1664            .iter()
1665            .map(|pg| {
1666                format!(
1667                    "<DBParameterGroup>{}</DBParameterGroup>",
1668                    db_parameter_group_xml(pg)
1669                )
1670            })
1671            .collect::<Vec<_>>()
1672            .join("");
1673
1674        Ok(AwsResponse::xml(
1675            StatusCode::OK,
1676            xml_wrap(
1677                "DescribeDBParameterGroups",
1678                &format!(
1679                    "<DBParameterGroups>{}</DBParameterGroups>{}",
1680                    body, marker_xml
1681                ),
1682                &request.request_id,
1683            ),
1684        ))
1685    }
1686
1687    fn delete_db_parameter_group(
1688        &self,
1689        request: &AwsRequest,
1690    ) -> Result<AwsResponse, AwsServiceError> {
1691        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1692
1693        let mut state = self.state.write();
1694
1695        if db_parameter_group_name.starts_with("default.") {
1696            return Err(AwsServiceError::aws_error(
1697                StatusCode::BAD_REQUEST,
1698                "InvalidParameterValue",
1699                "Cannot delete default parameter groups.",
1700            ));
1701        }
1702
1703        if state
1704            .parameter_groups
1705            .remove(&db_parameter_group_name)
1706            .is_none()
1707        {
1708            return Err(AwsServiceError::aws_error(
1709                StatusCode::NOT_FOUND,
1710                "DBParameterGroupNotFound",
1711                format!("DBParameterGroup {db_parameter_group_name} not found."),
1712            ));
1713        }
1714
1715        Ok(AwsResponse::xml(
1716            StatusCode::OK,
1717            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1718        ))
1719    }
1720
1721    fn modify_db_parameter_group(
1722        &self,
1723        request: &AwsRequest,
1724    ) -> Result<AwsResponse, AwsServiceError> {
1725        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1726
1727        let mut state = self.state.write();
1728
1729        let parameter_group = state
1730            .parameter_groups
1731            .get_mut(&db_parameter_group_name)
1732            .ok_or_else(|| {
1733                AwsServiceError::aws_error(
1734                    StatusCode::NOT_FOUND,
1735                    "DBParameterGroupNotFound",
1736                    format!("DBParameterGroup {db_parameter_group_name} not found."),
1737                )
1738            })?;
1739
1740        if let Some(new_description) = optional_param(request, "Description") {
1741            parameter_group.description = new_description;
1742        }
1743
1744        let parameter_group_clone = parameter_group.clone();
1745
1746        Ok(AwsResponse::xml(
1747            StatusCode::OK,
1748            xml_wrap(
1749                "ModifyDBParameterGroup",
1750                &format!(
1751                    "<DBParameterGroupName>{}</DBParameterGroupName>",
1752                    xml_escape(&parameter_group_clone.db_parameter_group_name)
1753                ),
1754                &request.request_id,
1755            ),
1756        ))
1757    }
1758}
1759
1760fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1761    req.query_params
1762        .get(name)
1763        .cloned()
1764        .filter(|value| !value.is_empty())
1765}
1766
1767fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1768    optional_param(req, name).ok_or_else(|| {
1769        AwsServiceError::aws_error(
1770            StatusCode::BAD_REQUEST,
1771            "MissingParameter",
1772            format!("The request must contain the parameter {name}."),
1773        )
1774    })
1775}
1776
1777fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1778    let value = required_param(req, name)?;
1779    value.parse::<i32>().map_err(|_| {
1780        AwsServiceError::aws_error(
1781            StatusCode::BAD_REQUEST,
1782            "InvalidParameterValue",
1783            format!("Parameter {name} must be a valid integer."),
1784        )
1785    })
1786}
1787
1788fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1789    optional_param(req, name)
1790        .map(|value| {
1791            value.parse::<i32>().map_err(|_| {
1792                AwsServiceError::aws_error(
1793                    StatusCode::BAD_REQUEST,
1794                    "InvalidParameterValue",
1795                    format!("Parameter {name} must be a valid integer."),
1796                )
1797            })
1798        })
1799        .transpose()
1800}
1801
1802fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1803    let mut tags = Vec::new();
1804    for index in 1.. {
1805        let key_name = format!("Tags.Tag.{index}.Key");
1806        let value_name = format!("Tags.Tag.{index}.Value");
1807        let key = optional_param(req, &key_name);
1808        let value = optional_param(req, &value_name);
1809
1810        match (key, value) {
1811            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1812            (None, None) => break,
1813            _ => {
1814                return Err(AwsServiceError::aws_error(
1815                    StatusCode::BAD_REQUEST,
1816                    "InvalidParameterValue",
1817                    "Each tag must include both Key and Value.",
1818                ));
1819            }
1820        }
1821    }
1822
1823    Ok(tags)
1824}
1825
1826fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1827    let mut keys = Vec::new();
1828    for index in 1.. {
1829        let key_name = format!("TagKeys.member.{index}");
1830        match optional_param(req, &key_name) {
1831            Some(key) => keys.push(key),
1832            None => break,
1833        }
1834    }
1835
1836    Ok(keys)
1837}
1838
1839fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1840    let mut subnet_ids = Vec::new();
1841    for index in 1.. {
1842        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1843        match optional_param(req, &subnet_id_name) {
1844            Some(subnet_id) => subnet_ids.push(subnet_id),
1845            None => break,
1846        }
1847    }
1848
1849    Ok(subnet_ids)
1850}
1851
1852fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1853    let mut security_group_ids = Vec::new();
1854    for index in 1.. {
1855        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1856        match optional_param(req, &sg_id_name) {
1857            Some(sg_id) => security_group_ids.push(sg_id),
1858            None => break,
1859        }
1860    }
1861
1862    // If no security groups provided, return a default one
1863    if security_group_ids.is_empty() {
1864        security_group_ids.push("sg-default".to_string());
1865    }
1866
1867    security_group_ids
1868}
1869
1870fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1871    req.query_params.keys().any(|key| key.starts_with(prefix))
1872}
1873
1874fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1875    value
1876        .map(|raw| match raw {
1877            "true" | "True" | "TRUE" => Ok(true),
1878            "false" | "False" | "FALSE" => Ok(false),
1879            _ => Err(AwsServiceError::aws_error(
1880                StatusCode::BAD_REQUEST,
1881                "InvalidParameterValue",
1882                format!("Boolean parameter value '{raw}' is invalid."),
1883            )),
1884        })
1885        .transpose()
1886}
1887
1888struct PaginationResult<T> {
1889    items: Vec<T>,
1890    next_marker: Option<String>,
1891}
1892
1893fn paginate<T, F>(
1894    mut items: Vec<T>,
1895    marker: Option<String>,
1896    max_records: Option<String>,
1897    get_id: F,
1898) -> Result<PaginationResult<T>, AwsServiceError>
1899where
1900    F: Fn(&T) -> &str,
1901{
1902    // Parse max_records with default 100, max 100
1903    let max = if let Some(max_str) = max_records {
1904        let parsed = max_str.parse::<i32>().map_err(|_| {
1905            AwsServiceError::aws_error(
1906                StatusCode::BAD_REQUEST,
1907                "InvalidParameterValue",
1908                "MaxRecords must be a valid integer.",
1909            )
1910        })?;
1911        if !(1..=100).contains(&parsed) {
1912            return Err(AwsServiceError::aws_error(
1913                StatusCode::BAD_REQUEST,
1914                "InvalidParameterValue",
1915                "MaxRecords must be between 1 and 100.",
1916            ));
1917        }
1918        parsed as usize
1919    } else {
1920        100
1921    };
1922
1923    // Decode marker to get starting identifier
1924    let start_id = if let Some(encoded_marker) = marker {
1925        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
1926            AwsServiceError::aws_error(
1927                StatusCode::BAD_REQUEST,
1928                "InvalidParameterValue",
1929                "Marker is invalid.",
1930            )
1931        })?;
1932        let id = String::from_utf8(decoded).map_err(|_| {
1933            AwsServiceError::aws_error(
1934                StatusCode::BAD_REQUEST,
1935                "InvalidParameterValue",
1936                "Marker is invalid.",
1937            )
1938        })?;
1939        Some(id)
1940    } else {
1941        None
1942    };
1943
1944    // Find starting position
1945    let start_index = if let Some(ref start_id) = start_id {
1946        items
1947            .iter()
1948            .position(|item| get_id(item) == start_id)
1949            .map(|pos| pos + 1) // Start after the marker
1950            .unwrap_or(items.len()) // If not found, return empty result
1951    } else {
1952        0
1953    };
1954
1955    // Take items from start_index
1956    let total_items = items.len();
1957    let end_index = std::cmp::min(start_index + max, total_items);
1958    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
1959
1960    // Create next marker if there are more items
1961    let next_marker = if end_index < total_items {
1962        paginated_items
1963            .last()
1964            .map(|item| BASE64.encode(get_id(item).as_bytes()))
1965    } else {
1966        None
1967    };
1968
1969    Ok(PaginationResult {
1970        items: paginated_items,
1971        next_marker,
1972    })
1973}
1974
1975fn validate_create_request(
1976    db_instance_identifier: &str,
1977    allocated_storage: i32,
1978    db_instance_class: &str,
1979    engine: &str,
1980    engine_version: &str,
1981    port: i32,
1982) -> Result<(), AwsServiceError> {
1983    if allocated_storage <= 0 {
1984        return Err(AwsServiceError::aws_error(
1985            StatusCode::BAD_REQUEST,
1986            "InvalidParameterValue",
1987            "AllocatedStorage must be greater than zero.",
1988        ));
1989    }
1990    if port <= 0 {
1991        return Err(AwsServiceError::aws_error(
1992            StatusCode::BAD_REQUEST,
1993            "InvalidParameterValue",
1994            "Port must be greater than zero.",
1995        ));
1996    }
1997    if !db_instance_identifier
1998        .chars()
1999        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2000    {
2001        return Err(AwsServiceError::aws_error(
2002            StatusCode::BAD_REQUEST,
2003            "InvalidParameterValue",
2004            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2005        ));
2006    }
2007    // Validate engine
2008    let supported_engines = ["postgres", "mysql", "mariadb"];
2009    if !supported_engines.contains(&engine) {
2010        return Err(AwsServiceError::aws_error(
2011            StatusCode::BAD_REQUEST,
2012            "InvalidParameterValue",
2013            format!("Engine '{}' is not supported.", engine),
2014        ));
2015    }
2016
2017    // Validate engine version
2018    let supported_versions = match engine {
2019        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2020        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2021        "mariadb" => vec!["10.11.6", "10.6.16"],
2022        _ => vec![],
2023    };
2024
2025    if !supported_versions.contains(&engine_version) {
2026        return Err(AwsServiceError::aws_error(
2027            StatusCode::BAD_REQUEST,
2028            "InvalidParameterValue",
2029            format!("EngineVersion '{engine_version}' is not supported yet."),
2030        ));
2031    }
2032    validate_db_instance_class(db_instance_class)?;
2033    Ok(())
2034}
2035
2036fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2037    let supported_classes = [
2038        "db.t3.micro",
2039        "db.t3.small",
2040        "db.t3.medium",
2041        "db.t3.large",
2042        "db.t4g.micro",
2043        "db.t4g.small",
2044        "db.m5.large",
2045    ];
2046    if !supported_classes.contains(&db_instance_class) {
2047        return Err(AwsServiceError::aws_error(
2048            StatusCode::BAD_REQUEST,
2049            "InvalidParameterValue",
2050            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2051        ));
2052    }
2053    Ok(())
2054}
2055
2056fn filter_engine_versions(
2057    versions: &[EngineVersionInfo],
2058    engine: &Option<String>,
2059    engine_version: &Option<String>,
2060    family: &Option<String>,
2061) -> Vec<EngineVersionInfo> {
2062    versions
2063        .iter()
2064        .filter(|candidate| {
2065            engine
2066                .as_ref()
2067                .is_none_or(|expected| candidate.engine == *expected)
2068        })
2069        .filter(|candidate| {
2070            engine_version
2071                .as_ref()
2072                .is_none_or(|expected| candidate.engine_version == *expected)
2073        })
2074        .filter(|candidate| {
2075            family
2076                .as_ref()
2077                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2078        })
2079        .cloned()
2080        .collect()
2081}
2082
2083fn filter_orderable_options(
2084    options: &[OrderableDbInstanceOption],
2085    engine: &Option<String>,
2086    engine_version: &Option<String>,
2087    db_instance_class: &Option<String>,
2088    license_model: &Option<String>,
2089    vpc: Option<bool>,
2090) -> Vec<OrderableDbInstanceOption> {
2091    options
2092        .iter()
2093        .filter(|candidate| {
2094            engine
2095                .as_ref()
2096                .is_none_or(|expected| candidate.engine == *expected)
2097        })
2098        .filter(|candidate| {
2099            engine_version
2100                .as_ref()
2101                .is_none_or(|expected| candidate.engine_version == *expected)
2102        })
2103        .filter(|candidate| {
2104            db_instance_class
2105                .as_ref()
2106                .is_none_or(|expected| candidate.db_instance_class == *expected)
2107        })
2108        .filter(|candidate| {
2109            license_model
2110                .as_ref()
2111                .is_none_or(|expected| candidate.license_model == *expected)
2112        })
2113        .filter(|_| vpc.unwrap_or(true))
2114        .cloned()
2115        .collect()
2116}
2117
2118fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2119    format!(
2120        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
2121         <{action}Response xmlns=\"{RDS_NS}\">\
2122         <{action}Result>{inner}</{action}Result>\
2123         <ResponseMetadata><RequestId>{request_id}</RequestId></ResponseMetadata>\
2124         </{action}Response>"
2125    )
2126}
2127
2128fn engine_version_xml(version: &EngineVersionInfo) -> String {
2129    format!(
2130        "<DBEngineVersion>\
2131         <Engine>{}</Engine>\
2132         <EngineVersion>{}</EngineVersion>\
2133         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2134         <DBEngineDescription>{}</DBEngineDescription>\
2135         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2136         <Status>{}</Status>\
2137         </DBEngineVersion>",
2138        xml_escape(&version.engine),
2139        xml_escape(&version.engine_version),
2140        xml_escape(&version.db_parameter_group_family),
2141        xml_escape(&version.db_engine_description),
2142        xml_escape(&version.db_engine_version_description),
2143        xml_escape(&version.status),
2144    )
2145}
2146
2147fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2148    format!(
2149        "<OrderableDBInstanceOption>\
2150         <Engine>{}</Engine>\
2151         <EngineVersion>{}</EngineVersion>\
2152         <DBInstanceClass>{}</DBInstanceClass>\
2153         <LicenseModel>{}</LicenseModel>\
2154         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2155         <MultiAZCapable>true</MultiAZCapable>\
2156         <ReadReplicaCapable>true</ReadReplicaCapable>\
2157         <Vpc>true</Vpc>\
2158         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2159         <StorageType>{}</StorageType>\
2160         <SupportsIops>false</SupportsIops>\
2161         <MinStorageSize>{}</MinStorageSize>\
2162         <MaxStorageSize>{}</MaxStorageSize>\
2163         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2164         </OrderableDBInstanceOption>",
2165        xml_escape(&option.engine),
2166        xml_escape(&option.engine_version),
2167        xml_escape(&option.db_instance_class),
2168        xml_escape(&option.license_model),
2169        xml_escape(&option.storage_type),
2170        option.min_storage_size,
2171        option.max_storage_size,
2172    )
2173}
2174
2175fn tag_xml(tag: &RdsTag) -> String {
2176    format!(
2177        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2178        xml_escape(&tag.key),
2179        xml_escape(&tag.value),
2180    )
2181}
2182
2183fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2184    let status = status_override.unwrap_or(&instance.db_instance_status);
2185    let db_name_xml = instance
2186        .db_name
2187        .as_ref()
2188        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2189        .unwrap_or_default();
2190
2191    let read_replica_source_xml = instance
2192        .read_replica_source_db_instance_identifier
2193        .as_ref()
2194        .map(|source| {
2195            format!(
2196                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2197                xml_escape(source)
2198            )
2199        })
2200        .unwrap_or_default();
2201
2202    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2203        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2204    } else {
2205        format!(
2206            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2207            instance
2208                .read_replica_db_instance_identifiers
2209                .iter()
2210                .map(|id| format!(
2211                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2212                    xml_escape(id)
2213                ))
2214                .collect::<String>()
2215        )
2216    };
2217
2218    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2219        "<VpcSecurityGroups/>".to_string()
2220    } else {
2221        format!(
2222            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2223            instance
2224                .vpc_security_group_ids
2225                .iter()
2226                .map(|sg_id| format!(
2227                    "<VpcSecurityGroupMembership>\
2228                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2229                     <Status>active</Status>\
2230                     </VpcSecurityGroupMembership>",
2231                    xml_escape(sg_id)
2232                ))
2233                .collect::<String>()
2234        )
2235    };
2236
2237    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2238        Some(pg_name) => format!(
2239            "<DBParameterGroups>\
2240             <DBParameterGroup>\
2241             <DBParameterGroupName>{}</DBParameterGroupName>\
2242             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2243             </DBParameterGroup>\
2244             </DBParameterGroups>",
2245            xml_escape(pg_name)
2246        ),
2247        None => "<DBParameterGroups/>".to_string(),
2248    };
2249
2250    let option_group_memberships_xml = match &instance.option_group_name {
2251        Some(og_name) => format!(
2252            "<OptionGroupMemberships>\
2253             <OptionGroupMembership>\
2254             <OptionGroupName>{}</OptionGroupName>\
2255             <Status>in-sync</Status>\
2256             </OptionGroupMembership>\
2257             </OptionGroupMemberships>",
2258            xml_escape(og_name)
2259        ),
2260        None => "<OptionGroupMemberships/>".to_string(),
2261    };
2262
2263    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2264        let mut fields = Vec::new();
2265        if let Some(ref class) = pending.db_instance_class {
2266            fields.push(format!(
2267                "<DBInstanceClass>{}</DBInstanceClass>",
2268                xml_escape(class)
2269            ));
2270        }
2271        if let Some(allocated_storage) = pending.allocated_storage {
2272            fields.push(format!(
2273                "<AllocatedStorage>{}</AllocatedStorage>",
2274                allocated_storage
2275            ));
2276        }
2277        if let Some(backup_retention_period) = pending.backup_retention_period {
2278            fields.push(format!(
2279                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2280                backup_retention_period
2281            ));
2282        }
2283        if let Some(multi_az) = pending.multi_az {
2284            fields.push(format!(
2285                "<MultiAZ>{}</MultiAZ>",
2286                if multi_az { "true" } else { "false" }
2287            ));
2288        }
2289        if let Some(ref engine_version) = pending.engine_version {
2290            fields.push(format!(
2291                "<EngineVersion>{}</EngineVersion>",
2292                xml_escape(engine_version)
2293            ));
2294        }
2295        if pending.master_user_password.is_some() {
2296            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2297        }
2298        if !fields.is_empty() {
2299            format!(
2300                "<PendingModifiedValues>{}</PendingModifiedValues>",
2301                fields.join("")
2302            )
2303        } else {
2304            String::new()
2305        }
2306    } else {
2307        String::new()
2308    };
2309
2310    format!(
2311        "<DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2312         <DBInstanceClass>{}</DBInstanceClass>\
2313         <Engine>{}</Engine>\
2314         <DBInstanceStatus>{}</DBInstanceStatus>\
2315         <MasterUsername>{}</MasterUsername>\
2316         {}\
2317         <Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>\
2318         <AllocatedStorage>{}</AllocatedStorage>\
2319         <InstanceCreateTime>{}</InstanceCreateTime>\
2320         <PreferredBackupWindow>{}</PreferredBackupWindow>\
2321         <BackupRetentionPeriod>{}</BackupRetentionPeriod>\
2322         <DBSecurityGroups/>\
2323         {}\
2324         {}\
2325         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2326         <LatestRestorableTime>{}</LatestRestorableTime>\
2327         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2328         <MultiAZ>{}</MultiAZ>\
2329         <EngineVersion>{}</EngineVersion>\
2330         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2331         {}\
2332         {}\
2333         <LicenseModel>postgresql-license</LicenseModel>\
2334         {}\
2335         <PubliclyAccessible>{}</PubliclyAccessible>\
2336         <StorageType>gp2</StorageType>\
2337         <DbInstancePort>{}</DbInstancePort>\
2338         <StorageEncrypted>false</StorageEncrypted>\
2339         <DbiResourceId>{}</DbiResourceId>\
2340         <DeletionProtection>{}</DeletionProtection>\
2341         {}\
2342         <DBInstanceArn>{}</DBInstanceArn>",
2343        xml_escape(&instance.db_instance_identifier),
2344        xml_escape(&instance.db_instance_class),
2345        xml_escape(&instance.engine),
2346        xml_escape(status),
2347        xml_escape(&instance.master_username),
2348        db_name_xml,
2349        xml_escape(&instance.endpoint_address),
2350        instance.port,
2351        instance.allocated_storage,
2352        instance.created_at.to_rfc3339(),
2353        xml_escape(&instance.preferred_backup_window),
2354        instance.backup_retention_period,
2355        vpc_security_groups_xml,
2356        db_parameter_groups_xml,
2357        instance.latest_restorable_time.to_rfc3339(),
2358        if instance.multi_az { "true" } else { "false" },
2359        xml_escape(&instance.engine_version),
2360        read_replica_identifiers_xml,
2361        read_replica_source_xml,
2362        option_group_memberships_xml,
2363        if instance.publicly_accessible {
2364            "true"
2365        } else {
2366            "false"
2367        },
2368        instance.port,
2369        xml_escape(&instance.dbi_resource_id),
2370        if instance.deletion_protection {
2371            "true"
2372        } else {
2373            "false"
2374        },
2375        pending_modified_values_xml,
2376        xml_escape(&instance.db_instance_arn),
2377    )
2378}
2379
2380fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2381    format!(
2382        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2383         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2384         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2385         <Engine>{}</Engine>\
2386         <EngineVersion>{}</EngineVersion>\
2387         <AllocatedStorage>{}</AllocatedStorage>\
2388         <Status>{}</Status>\
2389         <Port>{}</Port>\
2390         <MasterUsername>{}</MasterUsername>\
2391         {}\
2392         <DbiResourceId>{}</DbiResourceId>\
2393         <SnapshotType>{}</SnapshotType>\
2394         <DBSnapshotArn>{}</DBSnapshotArn>",
2395        xml_escape(&snapshot.db_snapshot_identifier),
2396        xml_escape(&snapshot.db_instance_identifier),
2397        snapshot.snapshot_create_time.to_rfc3339(),
2398        xml_escape(&snapshot.engine),
2399        xml_escape(&snapshot.engine_version),
2400        snapshot.allocated_storage,
2401        xml_escape(&snapshot.status),
2402        snapshot.port,
2403        xml_escape(&snapshot.master_username),
2404        snapshot
2405            .db_name
2406            .as_ref()
2407            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2408            .unwrap_or_default(),
2409        xml_escape(&snapshot.dbi_resource_id),
2410        xml_escape(&snapshot.snapshot_type),
2411        xml_escape(&snapshot.db_snapshot_arn),
2412    )
2413}
2414
2415fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2416    let subnets_xml = subnet_group
2417        .subnet_ids
2418        .iter()
2419        .zip(&subnet_group.subnet_availability_zones)
2420        .map(|(subnet_id, az)| {
2421            format!(
2422                "<Subnet>\
2423                 <SubnetIdentifier>{}</SubnetIdentifier>\
2424                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2425                 <SubnetStatus>Active</SubnetStatus>\
2426                 </Subnet>",
2427                xml_escape(subnet_id),
2428                xml_escape(az)
2429            )
2430        })
2431        .collect::<String>();
2432
2433    format!(
2434        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2435         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2436         <VpcId>{}</VpcId>\
2437         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2438         <Subnets>{}</Subnets>\
2439         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2440        xml_escape(&subnet_group.db_subnet_group_name),
2441        xml_escape(&subnet_group.db_subnet_group_description),
2442        xml_escape(&subnet_group.vpc_id),
2443        subnets_xml,
2444        xml_escape(&subnet_group.db_subnet_group_arn),
2445    )
2446}
2447
2448fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2449    format!(
2450        "<DBParameterGroupName>{}</DBParameterGroupName>\
2451         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2452         <Description>{}</Description>\
2453         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2454        xml_escape(&parameter_group.db_parameter_group_name),
2455        xml_escape(&parameter_group.db_parameter_group_family),
2456        xml_escape(&parameter_group.description),
2457        xml_escape(&parameter_group.db_parameter_group_arn),
2458    )
2459}
2460
2461fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2462    AwsServiceError::aws_error(
2463        StatusCode::NOT_FOUND,
2464        "DBInstanceNotFound",
2465        format!("DBInstance {} not found.", identifier),
2466    )
2467}
2468
2469fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2470    AwsServiceError::aws_error(
2471        StatusCode::NOT_FOUND,
2472        "DBSnapshotNotFound",
2473        format!("DBSnapshot {} not found.", identifier),
2474    )
2475}
2476
2477fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2478    AwsServiceError::aws_error(
2479        StatusCode::NOT_FOUND,
2480        "DBInstanceNotFound",
2481        format!("DBInstance {resource_name} not found."),
2482    )
2483}
2484
2485fn find_instance_by_arn<'a>(
2486    state: &'a crate::state::RdsState,
2487    resource_name: &str,
2488) -> Result<&'a DbInstance, AwsServiceError> {
2489    state
2490        .instances
2491        .values()
2492        .find(|instance| instance.db_instance_arn == resource_name)
2493        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2494}
2495
2496fn find_instance_by_arn_mut<'a>(
2497    state: &'a mut crate::state::RdsState,
2498    resource_name: &str,
2499) -> Result<&'a mut DbInstance, AwsServiceError> {
2500    state
2501        .instances
2502        .values_mut()
2503        .find(|instance| instance.db_instance_arn == resource_name)
2504        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2505}
2506
2507fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2508    for tag in incoming {
2509        if let Some(existing_tag) = existing
2510            .iter_mut()
2511            .find(|candidate| candidate.key == tag.key)
2512        {
2513            existing_tag.value = tag.value.clone();
2514        } else {
2515            existing.push(tag.clone());
2516        }
2517    }
2518}
2519
2520fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2521    match error {
2522        RuntimeError::Unavailable => AwsServiceError::aws_error(
2523            StatusCode::SERVICE_UNAVAILABLE,
2524            "InvalidParameterValue",
2525            "Docker/Podman is required for RDS DB instances but is not available",
2526        ),
2527        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2528            StatusCode::INTERNAL_SERVER_ERROR,
2529            "InternalFailure",
2530            message,
2531        ),
2532    }
2533}
2534
2535#[cfg(test)]
2536mod tests {
2537    use std::collections::HashMap;
2538    use std::sync::Arc;
2539
2540    use bytes::Bytes;
2541    use chrono::Utc;
2542    use http::{HeaderMap, Method};
2543    use parking_lot::RwLock;
2544    use uuid::Uuid;
2545
2546    use super::{
2547        db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2548        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2549    };
2550    use crate::state::{
2551        default_engine_versions, default_orderable_options, DbInstance, RdsState, RdsTag,
2552    };
2553    use fakecloud_core::service::{AwsRequest, AwsService};
2554
2555    #[test]
2556    fn filter_engine_versions_matches_requested_engine() {
2557        let versions = default_engine_versions();
2558
2559        let filtered =
2560            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2561
2562        assert_eq!(filtered.len(), 4); // All postgres versions
2563        assert!(filtered.iter().all(|v| v.engine == "postgres"));
2564    }
2565
2566    #[test]
2567    fn filter_orderable_options_respects_instance_class() {
2568        let options = default_orderable_options();
2569
2570        let filtered = filter_orderable_options(
2571            &options,
2572            &Some("postgres".to_string()),
2573            &Some("16.3".to_string()),
2574            &Some("db.t3.micro".to_string()),
2575            &None,
2576            Some(true),
2577        );
2578
2579        assert_eq!(filtered.len(), 1);
2580        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2581    }
2582
2583    #[test]
2584    fn validate_create_request_rejects_unsupported_engine() {
2585        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2586            .expect_err("unsupported engine");
2587
2588        assert_eq!(error.code(), "InvalidParameterValue");
2589    }
2590
2591    #[test]
2592    fn optional_i32_param_rejects_invalid_integer() {
2593        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2594
2595        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2596
2597        assert_eq!(error.code(), "InvalidParameterValue");
2598    }
2599
2600    #[test]
2601    fn db_instance_xml_renders_endpoint_and_status() {
2602        let created_at = Utc::now();
2603        let instance = DbInstance {
2604            db_instance_identifier: "test-db".to_string(),
2605            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2606            db_instance_class: "db.t3.micro".to_string(),
2607            engine: "postgres".to_string(),
2608            engine_version: "16.3".to_string(),
2609            db_instance_status: "available".to_string(),
2610            master_username: "admin".to_string(),
2611            db_name: Some("appdb".to_string()),
2612            endpoint_address: "127.0.0.1".to_string(),
2613            port: 15432,
2614            allocated_storage: 20,
2615            publicly_accessible: true,
2616            deletion_protection: false,
2617            created_at,
2618            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2619            master_user_password: "secret123".to_string(),
2620            container_id: "container".to_string(),
2621            host_port: 15432,
2622            tags: Vec::new(),
2623            read_replica_source_db_instance_identifier: None,
2624            read_replica_db_instance_identifiers: Vec::new(),
2625            vpc_security_group_ids: vec!["sg-12345678".to_string()],
2626            db_parameter_group_name: Some("default.postgres16".to_string()),
2627            backup_retention_period: 1,
2628            preferred_backup_window: "03:00-04:00".to_string(),
2629            latest_restorable_time: created_at,
2630            option_group_name: None,
2631            multi_az: false,
2632            pending_modified_values: None,
2633        };
2634
2635        let xml = db_instance_xml(&instance, Some("creating"));
2636
2637        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2638        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2639        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2640    }
2641
2642    #[test]
2643    fn parse_tags_reads_rds_query_shape() {
2644        let request = request(
2645            "AddTagsToResource",
2646            &[
2647                ("Tags.Tag.1.Key", "env"),
2648                ("Tags.Tag.1.Value", "dev"),
2649                ("Tags.Tag.2.Key", "team"),
2650                ("Tags.Tag.2.Value", "core"),
2651            ],
2652        );
2653
2654        let tags = parse_tags(&request).expect("tags");
2655
2656        assert_eq!(
2657            tags,
2658            vec![
2659                RdsTag {
2660                    key: "env".to_string(),
2661                    value: "dev".to_string(),
2662                },
2663                RdsTag {
2664                    key: "team".to_string(),
2665                    value: "core".to_string(),
2666                }
2667            ]
2668        );
2669    }
2670
2671    #[test]
2672    fn parse_tag_keys_reads_member_shape() {
2673        let request = request(
2674            "RemoveTagsFromResource",
2675            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2676        );
2677
2678        let tag_keys = parse_tag_keys(&request).expect("tag keys");
2679
2680        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2681    }
2682
2683    #[test]
2684    fn merge_tags_updates_existing_values() {
2685        let mut tags = vec![RdsTag {
2686            key: "env".to_string(),
2687            value: "dev".to_string(),
2688        }];
2689
2690        merge_tags(
2691            &mut tags,
2692            &[
2693                RdsTag {
2694                    key: "env".to_string(),
2695                    value: "prod".to_string(),
2696                },
2697                RdsTag {
2698                    key: "team".to_string(),
2699                    value: "core".to_string(),
2700                },
2701            ],
2702        );
2703
2704        assert_eq!(tags.len(), 2);
2705        assert_eq!(tags[0].value, "prod");
2706        assert_eq!(tags[1].key, "team");
2707    }
2708
2709    #[tokio::test]
2710    async fn describe_engine_versions_returns_xml_body() {
2711        let service = RdsService::new(Arc::new(RwLock::new(RdsState::new(
2712            "123456789012",
2713            "us-east-1",
2714        ))));
2715        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2716
2717        let response = service.handle(request).await.expect("response");
2718        let body = String::from_utf8(response.body.to_vec()).expect("utf8");
2719
2720        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2721        assert!(body.contains("<Engine>postgres</Engine>"));
2722        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2723    }
2724
2725    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2726        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2727        for (key, value) in params {
2728            query_params.insert((*key).to_string(), (*value).to_string());
2729        }
2730
2731        AwsRequest {
2732            service: "rds".to_string(),
2733            action: action.to_string(),
2734            region: "us-east-1".to_string(),
2735            account_id: "123456789012".to_string(),
2736            request_id: "test-request-id".to_string(),
2737            headers: HeaderMap::new(),
2738            query_params,
2739            body: Bytes::new(),
2740            path_segments: vec![],
2741            raw_path: "/".to_string(),
2742            raw_query: String::new(),
2743            method: Method::POST,
2744            is_query_protocol: true,
2745            access_key_id: None,
2746        }
2747    }
2748}