Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8
9use fakecloud_aws::xml::xml_escape;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::runtime::{RdsRuntime, RuntimeError};
13use crate::state::{
14    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
15    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsTag, SharedRdsState,
16};
17
18const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "AddTagsToResource",
21    "CreateDBInstance",
22    "CreateDBInstanceReadReplica",
23    "CreateDBParameterGroup",
24    "CreateDBSnapshot",
25    "CreateDBSubnetGroup",
26    "DeleteDBInstance",
27    "DeleteDBParameterGroup",
28    "DeleteDBSnapshot",
29    "DeleteDBSubnetGroup",
30    "DescribeDBEngineVersions",
31    "DescribeDBInstances",
32    "DescribeDBParameterGroups",
33    "DescribeDBSnapshots",
34    "DescribeDBSubnetGroups",
35    "DescribeOrderableDBInstanceOptions",
36    "ListTagsForResource",
37    "ModifyDBInstance",
38    "ModifyDBParameterGroup",
39    "ModifyDBSubnetGroup",
40    "RebootDBInstance",
41    "RemoveTagsFromResource",
42    "RestoreDBInstanceFromDBSnapshot",
43];
44
45pub struct RdsService {
46    state: SharedRdsState,
47    runtime: Option<Arc<RdsRuntime>>,
48}
49
50impl RdsService {
51    pub fn new(state: SharedRdsState) -> Self {
52        Self {
53            state,
54            runtime: None,
55        }
56    }
57
58    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
59        self.runtime = Some(runtime);
60        self
61    }
62}
63
64#[async_trait]
65impl AwsService for RdsService {
66    fn service_name(&self) -> &str {
67        "rds"
68    }
69
70    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
71        match request.action.as_str() {
72            "AddTagsToResource" => self.add_tags_to_resource(&request),
73            "CreateDBInstance" => self.create_db_instance(&request).await,
74            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
75            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
76            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
77            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
78            "DeleteDBInstance" => self.delete_db_instance(&request).await,
79            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
80            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
81            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
82            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
83            "DescribeDBInstances" => self.describe_db_instances(&request),
84            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
85            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
86            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
87            "DescribeOrderableDBInstanceOptions" => {
88                self.describe_orderable_db_instance_options(&request)
89            }
90            "ListTagsForResource" => self.list_tags_for_resource(&request),
91            "ModifyDBInstance" => self.modify_db_instance(&request),
92            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
93            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
94            "RebootDBInstance" => self.reboot_db_instance(&request).await,
95            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
96            "RestoreDBInstanceFromDBSnapshot" => {
97                self.restore_db_instance_from_db_snapshot(&request).await
98            }
99            _ => Err(AwsServiceError::action_not_implemented(
100                self.service_name(),
101                &request.action,
102            )),
103        }
104    }
105
106    fn supported_actions(&self) -> &[&str] {
107        SUPPORTED_ACTIONS
108    }
109}
110
111impl RdsService {
112    async fn create_db_instance(
113        &self,
114        request: &AwsRequest,
115    ) -> Result<AwsResponse, AwsServiceError> {
116        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
117        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
118        let db_instance_class = required_param(request, "DBInstanceClass")?;
119        let engine = required_param(request, "Engine")?;
120        let master_username = required_param(request, "MasterUsername")?;
121        let master_user_password = required_param(request, "MasterUserPassword")?;
122        let db_name = optional_param(request, "DBName");
123        let engine_version =
124            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
125        let publicly_accessible =
126            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
127                .unwrap_or(true);
128        let deletion_protection =
129            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
130                .unwrap_or(false);
131        // Default port based on engine
132        let default_port = match engine.as_str() {
133            "postgres" => 5432,
134            "mysql" | "mariadb" => 3306,
135            _ => 5432,
136        };
137        let port = optional_i32_param(request, "Port")?.unwrap_or(default_port);
138        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
139
140        // Default parameter group based on engine and version
141        let default_param_group = match engine.as_str() {
142            "postgres" => {
143                let major = engine_version.split('.').next().unwrap_or("16");
144                format!("default.postgres{}", major)
145            }
146            "mysql" => {
147                let major = if engine_version.starts_with("5.7") {
148                    "5.7"
149                } else {
150                    "8.0"
151                };
152                format!("default.mysql{}", major)
153            }
154            "mariadb" => {
155                let major = if engine_version.starts_with("10.11") {
156                    "10.11"
157                } else {
158                    "10.6"
159                };
160                format!("default.mariadb{}", major)
161            }
162            _ => "default.postgres16".to_string(),
163        };
164        let db_parameter_group_name =
165            optional_param(request, "DBParameterGroupName").or(Some(default_param_group));
166
167        let backup_retention_period =
168            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
169        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
170            .unwrap_or_else(|| "03:00-04:00".to_string());
171        let option_group_name = optional_param(request, "OptionGroupName");
172        let multi_az =
173            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
174
175        validate_create_request(
176            &db_instance_identifier,
177            allocated_storage,
178            &db_instance_class,
179            &engine,
180            &engine_version,
181            port,
182        )?;
183
184        {
185            let mut state = self.state.write();
186            if !state.begin_instance_creation(&db_instance_identifier) {
187                return Err(AwsServiceError::aws_error(
188                    StatusCode::BAD_REQUEST,
189                    "DBInstanceAlreadyExists",
190                    format!("DBInstance {} already exists.", db_instance_identifier),
191                ));
192            }
193            // Validate parameter group exists if specified by the caller
194            if let Some(ref pg_name) = db_parameter_group_name {
195                if !state.parameter_groups.contains_key(pg_name) {
196                    state.cancel_instance_creation(&db_instance_identifier);
197                    return Err(AwsServiceError::aws_error(
198                        StatusCode::NOT_FOUND,
199                        "DBParameterGroupNotFound",
200                        format!("DBParameterGroup {} not found.", pg_name),
201                    ));
202                }
203            }
204        }
205
206        let runtime = self.runtime.as_ref().ok_or_else(|| {
207            AwsServiceError::aws_error(
208                StatusCode::SERVICE_UNAVAILABLE,
209                "InvalidParameterValue",
210                "Docker/Podman is required for RDS DB instances but is not available",
211            )
212        })?;
213
214        // Default database name based on engine
215        let logical_db_name = db_name.clone().unwrap_or_else(|| match engine.as_str() {
216            "postgres" => "postgres".to_string(),
217            "mysql" | "mariadb" => "mysql".to_string(),
218            _ => "postgres".to_string(),
219        });
220        let running = runtime
221            .ensure_postgres(
222                &db_instance_identifier,
223                &engine,
224                &engine_version,
225                &master_username,
226                &master_user_password,
227                &logical_db_name,
228            )
229            .await
230            .map_err(|error| {
231                self.state
232                    .write()
233                    .cancel_instance_creation(&db_instance_identifier);
234                runtime_error_to_service_error(error)
235            })?;
236
237        let mut state = self.state.write();
238        let created_at = Utc::now();
239        let instance = DbInstance {
240            db_instance_identifier: db_instance_identifier.clone(),
241            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
242            db_instance_class: db_instance_class.clone(),
243            engine: engine.clone(),
244            engine_version: engine_version.clone(),
245            db_instance_status: "available".to_string(),
246            master_username: master_username.clone(),
247            db_name: db_name.clone(),
248            endpoint_address: "127.0.0.1".to_string(),
249            port: i32::from(running.host_port),
250            allocated_storage,
251            publicly_accessible,
252            deletion_protection,
253            created_at,
254            dbi_resource_id: state.next_dbi_resource_id(),
255            master_user_password,
256            container_id: running.container_id,
257            host_port: running.host_port,
258            tags: Vec::new(),
259            read_replica_source_db_instance_identifier: None,
260            read_replica_db_instance_identifiers: Vec::new(),
261            vpc_security_group_ids,
262            db_parameter_group_name,
263            backup_retention_period,
264            preferred_backup_window,
265            latest_restorable_time: if backup_retention_period > 0 {
266                Some(created_at)
267            } else {
268                None
269            },
270            option_group_name,
271            multi_az,
272            pending_modified_values: None,
273        };
274        state.finish_instance_creation(instance.clone());
275
276        Ok(AwsResponse::xml(
277            StatusCode::OK,
278            xml_wrap(
279                "CreateDBInstance",
280                &format!(
281                    "<DBInstance>{}</DBInstance>",
282                    db_instance_xml(&instance, Some("creating"))
283                ),
284                &request.request_id,
285            ),
286        ))
287    }
288
289    async fn delete_db_instance(
290        &self,
291        request: &AwsRequest,
292    ) -> Result<AwsResponse, AwsServiceError> {
293        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
294        let skip_final_snapshot =
295            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
296                .unwrap_or(false);
297        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
298
299        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
300            return Err(AwsServiceError::aws_error(
301                StatusCode::BAD_REQUEST,
302                "InvalidParameterCombination",
303                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
304            ));
305        }
306        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
307            return Err(AwsServiceError::aws_error(
308                StatusCode::BAD_REQUEST,
309                "InvalidParameterCombination",
310                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
311            ));
312        }
313
314        // Check deletion protection BEFORE creating snapshot or making any changes
315        {
316            let state = self.state.read();
317            if let Some(instance) = state.instances.get(&db_instance_identifier) {
318                if instance.deletion_protection {
319                    return Err(AwsServiceError::aws_error(
320                        StatusCode::BAD_REQUEST,
321                        "InvalidDBInstanceState",
322                        format!(
323                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
324                            db_instance_identifier
325                        ),
326                    ));
327                }
328            } else {
329                return Err(db_instance_not_found(&db_instance_identifier));
330            }
331        }
332
333        // Create final snapshot if requested
334        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
335            let runtime = self.runtime.as_ref().ok_or_else(|| {
336                AwsServiceError::aws_error(
337                    StatusCode::SERVICE_UNAVAILABLE,
338                    "InvalidParameterValue",
339                    "Docker/Podman is required for RDS snapshots but is not available",
340                )
341            })?;
342
343            let (instance_for_snapshot, db_name) = {
344                let state = self.state.read();
345
346                if state.snapshots.contains_key(snapshot_id) {
347                    return Err(AwsServiceError::aws_error(
348                        StatusCode::CONFLICT,
349                        "DBSnapshotAlreadyExists",
350                        format!("DBSnapshot {snapshot_id} already exists."),
351                    ));
352                }
353
354                let instance = state
355                    .instances
356                    .get(&db_instance_identifier)
357                    .cloned()
358                    .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
359
360                let default_db = default_db_name(&instance.engine);
361                let db_name = instance
362                    .db_name
363                    .as_deref()
364                    .unwrap_or(default_db)
365                    .to_string();
366
367                (instance, db_name)
368            };
369
370            let dump_data = runtime
371                .dump_database(
372                    &db_instance_identifier,
373                    &instance_for_snapshot.engine,
374                    &instance_for_snapshot.master_username,
375                    &instance_for_snapshot.master_user_password,
376                    &db_name,
377                )
378                .await
379                .map_err(runtime_error_to_service_error)?;
380
381            let mut state = self.state.write();
382
383            if state.snapshots.contains_key(snapshot_id) {
384                return Err(AwsServiceError::aws_error(
385                    StatusCode::CONFLICT,
386                    "DBSnapshotAlreadyExists",
387                    format!("DBSnapshot {snapshot_id} already exists."),
388                ));
389            }
390
391            let snapshot_arn = state.db_snapshot_arn(snapshot_id);
392
393            let snapshot = DbSnapshot {
394                db_snapshot_identifier: snapshot_id.clone(),
395                db_snapshot_arn: snapshot_arn,
396                db_instance_identifier: db_instance_identifier.clone(),
397                snapshot_create_time: Utc::now(),
398                engine: instance_for_snapshot.engine.clone(),
399                engine_version: instance_for_snapshot.engine_version.clone(),
400                allocated_storage: instance_for_snapshot.allocated_storage,
401                status: "available".to_string(),
402                port: instance_for_snapshot.port,
403                master_username: instance_for_snapshot.master_username.clone(),
404                db_name: instance_for_snapshot.db_name.clone(),
405                dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
406                snapshot_type: "manual".to_string(),
407                master_user_password: instance_for_snapshot.master_user_password.clone(),
408                tags: Vec::new(),
409                dump_data,
410            };
411
412            state.snapshots.insert(snapshot_id.clone(), snapshot);
413        }
414
415        let instance = {
416            let mut state = self.state.write();
417            let instance = state
418                .instances
419                .remove(&db_instance_identifier)
420                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
421
422            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
423                if let Some(source) = state.instances.get_mut(source_id) {
424                    source
425                        .read_replica_db_instance_identifiers
426                        .retain(|id| id != &db_instance_identifier);
427                }
428            }
429
430            for replica_id in &instance.read_replica_db_instance_identifiers {
431                if let Some(replica) = state.instances.get_mut(replica_id) {
432                    replica.read_replica_source_db_instance_identifier = None;
433                }
434            }
435
436            instance
437        };
438
439        if let Some(runtime) = &self.runtime {
440            runtime.stop_container(&db_instance_identifier).await;
441        }
442
443        Ok(AwsResponse::xml(
444            StatusCode::OK,
445            xml_wrap(
446                "DeleteDBInstance",
447                &format!(
448                    "<DBInstance>{}</DBInstance>",
449                    db_instance_xml(&instance, Some("deleting"))
450                ),
451                &request.request_id,
452            ),
453        ))
454    }
455
456    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
457        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
458        let db_instance_class = optional_param(request, "DBInstanceClass");
459        let deletion_protection =
460            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
461        let apply_immediately =
462            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
463
464        // Parse VPC security group IDs - only if at least one is provided
465        let vpc_security_group_ids = {
466            let mut ids = Vec::new();
467            for index in 1.. {
468                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
469                match optional_param(request, &sg_id_name) {
470                    Some(sg_id) => ids.push(sg_id),
471                    None => break,
472                }
473            }
474            if ids.is_empty() {
475                None
476            } else {
477                Some(ids)
478            }
479        };
480
481        if db_instance_class.is_none()
482            && deletion_protection.is_none()
483            && vpc_security_group_ids.is_none()
484        {
485            return Err(AwsServiceError::aws_error(
486                StatusCode::BAD_REQUEST,
487                "InvalidParameterCombination",
488                "At least one supported mutable field must be provided.",
489            ));
490        }
491        if let Some(ref class) = db_instance_class {
492            validate_db_instance_class(class)?;
493        }
494
495        let mut state = self.state.write();
496        let instance = state
497            .instances
498            .get_mut(&db_instance_identifier)
499            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
500
501        // If ApplyImmediately is false, stage changes as pending
502        if apply_immediately == Some(false) {
503            let pending = instance
504                .pending_modified_values
505                .get_or_insert(Default::default());
506            if let Some(class) = db_instance_class {
507                pending.db_instance_class = Some(class);
508            }
509            // Note: deletion_protection and vpc_security_group_ids are applied immediately
510            // regardless of ApplyImmediately flag (per AWS behavior)
511            if let Some(deletion_protection) = deletion_protection {
512                instance.deletion_protection = deletion_protection;
513            }
514            if let Some(security_group_ids) = vpc_security_group_ids {
515                instance.vpc_security_group_ids = security_group_ids;
516            }
517        } else {
518            // Apply immediately (default behavior)
519            if let Some(class) = db_instance_class {
520                instance.db_instance_class = class;
521            }
522            if let Some(deletion_protection) = deletion_protection {
523                instance.deletion_protection = deletion_protection;
524            }
525            if let Some(security_group_ids) = vpc_security_group_ids {
526                instance.vpc_security_group_ids = security_group_ids;
527            }
528        }
529
530        Ok(AwsResponse::xml(
531            StatusCode::OK,
532            xml_wrap(
533                "ModifyDBInstance",
534                &format!(
535                    "<DBInstance>{}</DBInstance>",
536                    db_instance_xml(instance, Some("modifying"))
537                ),
538                &request.request_id,
539            ),
540        ))
541    }
542
543    async fn reboot_db_instance(
544        &self,
545        request: &AwsRequest,
546    ) -> Result<AwsResponse, AwsServiceError> {
547        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
548        let force_failover =
549            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
550        if force_failover == Some(true) {
551            return Err(AwsServiceError::aws_error(
552                StatusCode::BAD_REQUEST,
553                "InvalidParameterCombination",
554                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
555            ));
556        }
557
558        let instance = {
559            let state = self.state.read();
560            state
561                .instances
562                .get(&db_instance_identifier)
563                .cloned()
564                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
565        };
566
567        let runtime = self.runtime.as_ref().ok_or_else(|| {
568            AwsServiceError::aws_error(
569                StatusCode::SERVICE_UNAVAILABLE,
570                "InvalidParameterValue",
571                "Docker/Podman is required for RDS DB instances but is not available",
572            )
573        })?;
574
575        let running = runtime
576            .restart_container(
577                &db_instance_identifier,
578                &instance.engine,
579                &instance.master_username,
580                &instance.master_user_password,
581                instance
582                    .db_name
583                    .as_deref()
584                    .unwrap_or(default_db_name(&instance.engine)),
585            )
586            .await
587            .map_err(runtime_error_to_service_error)?;
588
589        let instance = {
590            let mut state = self.state.write();
591            let instance = state
592                .instances
593                .get_mut(&db_instance_identifier)
594                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
595            instance.host_port = running.host_port;
596            instance.port = i32::from(running.host_port);
597
598            // Apply any pending modifications
599            if let Some(pending) = instance.pending_modified_values.take() {
600                if let Some(class) = pending.db_instance_class {
601                    instance.db_instance_class = class;
602                }
603                if let Some(allocated_storage) = pending.allocated_storage {
604                    instance.allocated_storage = allocated_storage;
605                }
606                if let Some(backup_retention_period) = pending.backup_retention_period {
607                    instance.backup_retention_period = backup_retention_period;
608                }
609                if let Some(multi_az) = pending.multi_az {
610                    instance.multi_az = multi_az;
611                }
612                if let Some(engine_version) = pending.engine_version {
613                    instance.engine_version = engine_version;
614                }
615                if let Some(master_user_password) = pending.master_user_password {
616                    instance.master_user_password = master_user_password;
617                }
618            }
619
620            instance.clone()
621        };
622
623        Ok(AwsResponse::xml(
624            StatusCode::OK,
625            xml_wrap(
626                "RebootDBInstance",
627                &format!(
628                    "<DBInstance>{}</DBInstance>",
629                    db_instance_xml(&instance, Some("rebooting"))
630                ),
631                &request.request_id,
632            ),
633        ))
634    }
635
636    fn describe_db_engine_versions(
637        &self,
638        request: &AwsRequest,
639    ) -> Result<AwsResponse, AwsServiceError> {
640        let engine = optional_param(request, "Engine");
641        let engine_version = optional_param(request, "EngineVersion");
642        let family = optional_param(request, "DBParameterGroupFamily");
643        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
644
645        let mut versions = filter_engine_versions(
646            &default_engine_versions(),
647            &engine,
648            &engine_version,
649            &family,
650        );
651
652        if default_only.unwrap_or(false) {
653            versions.truncate(1);
654        }
655
656        Ok(AwsResponse::xml(
657            StatusCode::OK,
658            xml_wrap(
659                "DescribeDBEngineVersions",
660                &format!(
661                    "<DBEngineVersions>{}</DBEngineVersions>",
662                    versions.iter().map(engine_version_xml).collect::<String>()
663                ),
664                &request.request_id,
665            ),
666        ))
667    }
668
669    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
670        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
671        let marker = optional_param(request, "Marker");
672        let max_records = optional_param(request, "MaxRecords");
673
674        let state = self.state.read();
675
676        // If specific identifier requested, return just that one (no pagination)
677        if let Some(identifier) = db_instance_identifier {
678            let instance = state
679                .instances
680                .get(&identifier)
681                .cloned()
682                .ok_or_else(|| db_instance_not_found(&identifier))?;
683
684            return Ok(AwsResponse::xml(
685                StatusCode::OK,
686                xml_wrap(
687                    "DescribeDBInstances",
688                    &format!(
689                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
690                        db_instance_xml(&instance, None)
691                    ),
692                    &request.request_id,
693                ),
694            ));
695        }
696
697        // Get all instances sorted by created_at, then identifier
698        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
699        instances.sort_by(|a, b| {
700            a.created_at
701                .cmp(&b.created_at)
702                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
703        });
704
705        // Apply pagination
706        let paginated = paginate(instances, marker, max_records, |inst| {
707            &inst.db_instance_identifier
708        })?;
709
710        let marker_xml = paginated
711            .next_marker
712            .as_ref()
713            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
714            .unwrap_or_default();
715
716        Ok(AwsResponse::xml(
717            StatusCode::OK,
718            xml_wrap(
719                "DescribeDBInstances",
720                &format!(
721                    "<DBInstances>{}</DBInstances>{}",
722                    paginated
723                        .items
724                        .iter()
725                        .map(|instance| {
726                            format!(
727                                "<DBInstance>{}</DBInstance>",
728                                db_instance_xml(instance, None)
729                            )
730                        })
731                        .collect::<String>(),
732                    marker_xml
733                ),
734                &request.request_id,
735            ),
736        ))
737    }
738
739    fn describe_orderable_db_instance_options(
740        &self,
741        request: &AwsRequest,
742    ) -> Result<AwsResponse, AwsServiceError> {
743        let engine = optional_param(request, "Engine");
744        let engine_version = optional_param(request, "EngineVersion");
745        let db_instance_class = optional_param(request, "DBInstanceClass");
746        let license_model = optional_param(request, "LicenseModel");
747        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
748
749        let options = filter_orderable_options(
750            &default_orderable_options(),
751            &engine,
752            &engine_version,
753            &db_instance_class,
754            &license_model,
755            vpc,
756        );
757
758        Ok(AwsResponse::xml(
759            StatusCode::OK,
760            xml_wrap(
761                "DescribeOrderableDBInstanceOptions",
762                &format!(
763                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
764                    options.iter().map(orderable_option_xml).collect::<String>()
765                ),
766                &request.request_id,
767            ),
768        ))
769    }
770
771    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
772        let resource_name = required_param(request, "ResourceName")?;
773        let tags = parse_tags(request)?;
774
775        if tags.is_empty() {
776            return Err(AwsServiceError::aws_error(
777                StatusCode::BAD_REQUEST,
778                "MissingParameter",
779                "The request must contain the parameter Tags.",
780            ));
781        }
782
783        let mut state = self.state.write();
784        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
785        merge_tags(&mut instance.tags, &tags);
786
787        Ok(AwsResponse::xml(
788            StatusCode::OK,
789            xml_wrap("AddTagsToResource", "", &request.request_id),
790        ))
791    }
792
793    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
794        let resource_name = required_param(request, "ResourceName")?;
795        if query_param_prefix_exists(request, "Filters.") {
796            return Err(AwsServiceError::aws_error(
797                StatusCode::BAD_REQUEST,
798                "InvalidParameterValue",
799                "Filters are not yet supported for ListTagsForResource.",
800            ));
801        }
802
803        let state = self.state.read();
804        let instance = find_instance_by_arn(&state, &resource_name)?;
805        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
806
807        Ok(AwsResponse::xml(
808            StatusCode::OK,
809            xml_wrap(
810                "ListTagsForResource",
811                &format!("<TagList>{tag_xml}</TagList>"),
812                &request.request_id,
813            ),
814        ))
815    }
816
817    fn remove_tags_from_resource(
818        &self,
819        request: &AwsRequest,
820    ) -> Result<AwsResponse, AwsServiceError> {
821        let resource_name = required_param(request, "ResourceName")?;
822        let tag_keys = parse_tag_keys(request)?;
823
824        if tag_keys.is_empty() {
825            return Err(AwsServiceError::aws_error(
826                StatusCode::BAD_REQUEST,
827                "MissingParameter",
828                "The request must contain the parameter TagKeys.",
829            ));
830        }
831
832        let mut state = self.state.write();
833        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
834        instance
835            .tags
836            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
837
838        Ok(AwsResponse::xml(
839            StatusCode::OK,
840            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
841        ))
842    }
843
844    async fn create_db_snapshot(
845        &self,
846        request: &AwsRequest,
847    ) -> Result<AwsResponse, AwsServiceError> {
848        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
849        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
850
851        let runtime = self.runtime.as_ref().ok_or_else(|| {
852            AwsServiceError::aws_error(
853                StatusCode::SERVICE_UNAVAILABLE,
854                "InvalidParameterValue",
855                "Docker/Podman is required for RDS snapshots but is not available",
856            )
857        })?;
858
859        let (instance, db_name) = {
860            let state = self.state.write();
861
862            if state.snapshots.contains_key(&db_snapshot_identifier) {
863                return Err(AwsServiceError::aws_error(
864                    StatusCode::CONFLICT,
865                    "DBSnapshotAlreadyExists",
866                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
867                ));
868            }
869
870            let instance = state
871                .instances
872                .get(&db_instance_identifier)
873                .cloned()
874                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
875
876            let default_db = default_db_name(&instance.engine);
877            let db_name = instance
878                .db_name
879                .as_deref()
880                .unwrap_or(default_db)
881                .to_string();
882
883            (instance, db_name)
884        };
885
886        let dump_data = runtime
887            .dump_database(
888                &db_instance_identifier,
889                &instance.engine,
890                &instance.master_username,
891                &instance.master_user_password,
892                &db_name,
893            )
894            .await
895            .map_err(runtime_error_to_service_error)?;
896
897        let mut state = self.state.write();
898
899        if state.snapshots.contains_key(&db_snapshot_identifier) {
900            return Err(AwsServiceError::aws_error(
901                StatusCode::CONFLICT,
902                "DBSnapshotAlreadyExists",
903                format!("DBSnapshot {db_snapshot_identifier} already exists."),
904            ));
905        }
906
907        let snapshot = DbSnapshot {
908            db_snapshot_identifier: db_snapshot_identifier.clone(),
909            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
910            db_instance_identifier: instance.db_instance_identifier.clone(),
911            snapshot_create_time: Utc::now(),
912            engine: instance.engine.clone(),
913            engine_version: instance.engine_version.clone(),
914            allocated_storage: instance.allocated_storage,
915            status: "available".to_string(),
916            port: instance.port,
917            master_username: instance.master_username.clone(),
918            db_name: instance.db_name.clone(),
919            dbi_resource_id: instance.dbi_resource_id.clone(),
920            snapshot_type: "manual".to_string(),
921            master_user_password: instance.master_user_password.clone(),
922            tags: Vec::new(),
923            dump_data,
924        };
925
926        state
927            .snapshots
928            .insert(db_snapshot_identifier, snapshot.clone());
929
930        Ok(AwsResponse::xml(
931            StatusCode::OK,
932            xml_wrap(
933                "CreateDBSnapshot",
934                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
935                &request.request_id,
936            ),
937        ))
938    }
939
940    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
941        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
942        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
943        let marker = optional_param(request, "Marker");
944        let max_records = optional_param(request, "MaxRecords");
945
946        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
947            return Err(AwsServiceError::aws_error(
948                StatusCode::BAD_REQUEST,
949                "InvalidParameterCombination",
950                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
951            ));
952        }
953
954        let state = self.state.read();
955
956        // If specific snapshot requested, return just that one (no pagination)
957        if let Some(snapshot_id) = db_snapshot_identifier {
958            let snapshot = state
959                .snapshots
960                .get(&snapshot_id)
961                .cloned()
962                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
963
964            return Ok(AwsResponse::xml(
965                StatusCode::OK,
966                xml_wrap(
967                    "DescribeDBSnapshots",
968                    &format!(
969                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
970                        db_snapshot_xml(&snapshot)
971                    ),
972                    &request.request_id,
973                ),
974            ));
975        }
976
977        // Get snapshots, filtered by instance identifier if provided
978        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
979            state
980                .snapshots
981                .values()
982                .filter(|s| s.db_instance_identifier == instance_id)
983                .cloned()
984                .collect()
985        } else {
986            state.snapshots.values().cloned().collect()
987        };
988
989        // Sort by creation time, then identifier
990        snapshots.sort_by(|a, b| {
991            a.snapshot_create_time
992                .cmp(&b.snapshot_create_time)
993                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
994        });
995
996        // Apply pagination
997        let paginated = paginate(snapshots, marker, max_records, |snap| {
998            &snap.db_snapshot_identifier
999        })?;
1000
1001        let marker_xml = paginated
1002            .next_marker
1003            .as_ref()
1004            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1005            .unwrap_or_default();
1006
1007        Ok(AwsResponse::xml(
1008            StatusCode::OK,
1009            xml_wrap(
1010                "DescribeDBSnapshots",
1011                &format!(
1012                    "<DBSnapshots>{}</DBSnapshots>{}",
1013                    paginated
1014                        .items
1015                        .iter()
1016                        .map(|snapshot| format!(
1017                            "<DBSnapshot>{}</DBSnapshot>",
1018                            db_snapshot_xml(snapshot)
1019                        ))
1020                        .collect::<String>(),
1021                    marker_xml
1022                ),
1023                &request.request_id,
1024            ),
1025        ))
1026    }
1027
1028    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1029        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1030
1031        let mut state = self.state.write();
1032
1033        let snapshot = state
1034            .snapshots
1035            .remove(&db_snapshot_identifier)
1036            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1037
1038        Ok(AwsResponse::xml(
1039            StatusCode::OK,
1040            xml_wrap(
1041                "DeleteDBSnapshot",
1042                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1043                &request.request_id,
1044            ),
1045        ))
1046    }
1047
1048    async fn restore_db_instance_from_db_snapshot(
1049        &self,
1050        request: &AwsRequest,
1051    ) -> Result<AwsResponse, AwsServiceError> {
1052        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1053        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1054        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1055
1056        let runtime = self.runtime.as_ref().ok_or_else(|| {
1057            AwsServiceError::aws_error(
1058                StatusCode::SERVICE_UNAVAILABLE,
1059                "InvalidParameterValue",
1060                "Docker/Podman is required for RDS DB instances but is not available",
1061            )
1062        })?;
1063
1064        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1065            let mut state = self.state.write();
1066
1067            if !state.begin_instance_creation(&db_instance_identifier) {
1068                return Err(AwsServiceError::aws_error(
1069                    StatusCode::CONFLICT,
1070                    "DBInstanceAlreadyExists",
1071                    format!("DBInstance {db_instance_identifier} already exists."),
1072                ));
1073            }
1074
1075            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1076                Some(s) => s,
1077                None => {
1078                    state.cancel_instance_creation(&db_instance_identifier);
1079                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1080                }
1081            };
1082
1083            let dbi_resource_id = state.next_dbi_resource_id();
1084            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1085            let created_at = Utc::now();
1086
1087            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1088        };
1089
1090        let db_name = snapshot
1091            .db_name
1092            .as_deref()
1093            .unwrap_or(default_db_name(&snapshot.engine));
1094        let running = match runtime
1095            .ensure_postgres(
1096                &db_instance_identifier,
1097                &snapshot.engine,
1098                &snapshot.engine_version,
1099                &snapshot.master_username,
1100                &snapshot.master_user_password,
1101                db_name,
1102            )
1103            .await
1104        {
1105            Ok(running) => running,
1106            Err(e) => {
1107                self.state
1108                    .write()
1109                    .cancel_instance_creation(&db_instance_identifier);
1110                return Err(runtime_error_to_service_error(e));
1111            }
1112        };
1113
1114        if let Err(e) = runtime
1115            .restore_database(
1116                &db_instance_identifier,
1117                &snapshot.engine,
1118                &snapshot.master_username,
1119                &snapshot.master_user_password,
1120                db_name,
1121                &snapshot.dump_data,
1122            )
1123            .await
1124        {
1125            self.state
1126                .write()
1127                .cancel_instance_creation(&db_instance_identifier);
1128            runtime.stop_container(&db_instance_identifier).await;
1129            return Err(runtime_error_to_service_error(e));
1130        }
1131
1132        let mut state = self.state.write();
1133
1134        let instance = DbInstance {
1135            db_instance_identifier: db_instance_identifier.clone(),
1136            db_instance_arn,
1137            db_instance_class: "db.t3.micro".to_string(),
1138            engine: snapshot.engine.clone(),
1139            engine_version: snapshot.engine_version.clone(),
1140            db_instance_status: "available".to_string(),
1141            master_username: snapshot.master_username.clone(),
1142            db_name: snapshot.db_name.clone(),
1143            endpoint_address: "127.0.0.1".to_string(),
1144            port: i32::from(running.host_port),
1145            allocated_storage: snapshot.allocated_storage,
1146            publicly_accessible: true,
1147            deletion_protection: false,
1148            created_at,
1149            dbi_resource_id,
1150            master_user_password: snapshot.master_user_password.clone(),
1151            container_id: running.container_id,
1152            host_port: running.host_port,
1153            tags: Vec::new(),
1154            read_replica_source_db_instance_identifier: None,
1155            read_replica_db_instance_identifiers: Vec::new(),
1156            vpc_security_group_ids,
1157            db_parameter_group_name: None,
1158            backup_retention_period: 1,
1159            preferred_backup_window: "03:00-04:00".to_string(),
1160            latest_restorable_time: Some(created_at),
1161            option_group_name: None,
1162            multi_az: false,
1163            pending_modified_values: None,
1164        };
1165
1166        state.finish_instance_creation(instance.clone());
1167
1168        Ok(AwsResponse::xml(
1169            StatusCode::OK,
1170            xml_wrap(
1171                "RestoreDBInstanceFromDBSnapshot",
1172                &format!(
1173                    "<DBInstance>{}</DBInstance>",
1174                    db_instance_xml(&instance, None)
1175                ),
1176                &request.request_id,
1177            ),
1178        ))
1179    }
1180
1181    async fn create_db_instance_read_replica(
1182        &self,
1183        request: &AwsRequest,
1184    ) -> Result<AwsResponse, AwsServiceError> {
1185        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1186        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1187
1188        let runtime = self.runtime.as_ref().ok_or_else(|| {
1189            AwsServiceError::aws_error(
1190                StatusCode::SERVICE_UNAVAILABLE,
1191                "InvalidParameterValue",
1192                "Docker/Podman is required for RDS read replicas but is not available",
1193            )
1194        })?;
1195
1196        let (source_instance, db_name) = {
1197            let mut state = self.state.write();
1198
1199            if !state.begin_instance_creation(&db_instance_identifier) {
1200                return Err(AwsServiceError::aws_error(
1201                    StatusCode::CONFLICT,
1202                    "DBInstanceAlreadyExists",
1203                    format!("DBInstance {db_instance_identifier} already exists."),
1204                ));
1205            }
1206
1207            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1208            {
1209                Some(inst) => inst,
1210                None => {
1211                    state.cancel_instance_creation(&db_instance_identifier);
1212                    return Err(db_instance_not_found(&source_db_instance_identifier));
1213                }
1214            };
1215
1216            let default_db = default_db_name(&source_instance.engine);
1217            let db_name = source_instance
1218                .db_name
1219                .as_deref()
1220                .unwrap_or(default_db)
1221                .to_string();
1222
1223            (source_instance, db_name)
1224        };
1225
1226        let dump_data = match runtime
1227            .dump_database(
1228                &source_db_instance_identifier,
1229                &source_instance.engine,
1230                &source_instance.master_username,
1231                &source_instance.master_user_password,
1232                &db_name,
1233            )
1234            .await
1235        {
1236            Ok(data) => data,
1237            Err(e) => {
1238                self.state
1239                    .write()
1240                    .cancel_instance_creation(&db_instance_identifier);
1241                return Err(runtime_error_to_service_error(e));
1242            }
1243        };
1244
1245        let dbi_resource_id = self.state.read().next_dbi_resource_id();
1246        let db_instance_arn = self.state.read().db_instance_arn(&db_instance_identifier);
1247        let created_at = Utc::now();
1248
1249        let running = match runtime
1250            .ensure_postgres(
1251                &db_instance_identifier,
1252                &source_instance.engine,
1253                &source_instance.engine_version,
1254                &source_instance.master_username,
1255                &source_instance.master_user_password,
1256                &db_name,
1257            )
1258            .await
1259        {
1260            Ok(running) => running,
1261            Err(e) => {
1262                self.state
1263                    .write()
1264                    .cancel_instance_creation(&db_instance_identifier);
1265                return Err(runtime_error_to_service_error(e));
1266            }
1267        };
1268
1269        if let Err(e) = runtime
1270            .restore_database(
1271                &db_instance_identifier,
1272                &source_instance.engine,
1273                &source_instance.master_username,
1274                &source_instance.master_user_password,
1275                &db_name,
1276                &dump_data,
1277            )
1278            .await
1279        {
1280            self.state
1281                .write()
1282                .cancel_instance_creation(&db_instance_identifier);
1283            runtime.stop_container(&db_instance_identifier).await;
1284            return Err(runtime_error_to_service_error(e));
1285        }
1286
1287        let replica = DbInstance {
1288            db_instance_identifier: db_instance_identifier.clone(),
1289            db_instance_arn,
1290            db_instance_class: source_instance.db_instance_class.clone(),
1291            engine: source_instance.engine.clone(),
1292            engine_version: source_instance.engine_version.clone(),
1293            db_instance_status: "available".to_string(),
1294            master_username: source_instance.master_username.clone(),
1295            db_name: source_instance.db_name.clone(),
1296            endpoint_address: "127.0.0.1".to_string(),
1297            port: i32::from(running.host_port),
1298            allocated_storage: source_instance.allocated_storage,
1299            publicly_accessible: source_instance.publicly_accessible,
1300            deletion_protection: false,
1301            created_at,
1302            dbi_resource_id,
1303            master_user_password: source_instance.master_user_password.clone(),
1304            container_id: running.container_id,
1305            host_port: running.host_port,
1306            tags: Vec::new(),
1307            read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.clone()),
1308            read_replica_db_instance_identifiers: Vec::new(),
1309            vpc_security_group_ids: source_instance.vpc_security_group_ids.clone(),
1310            db_parameter_group_name: source_instance.db_parameter_group_name.clone(),
1311            backup_retention_period: source_instance.backup_retention_period,
1312            preferred_backup_window: source_instance.preferred_backup_window.clone(),
1313            latest_restorable_time: if source_instance.backup_retention_period > 0 {
1314                Some(created_at)
1315            } else {
1316                None
1317            },
1318            option_group_name: source_instance.option_group_name.clone(),
1319            multi_az: source_instance.multi_az,
1320            pending_modified_values: None,
1321        };
1322
1323        let source_missing = {
1324            let mut state = self.state.write();
1325            match state.instances.get_mut(&source_db_instance_identifier) {
1326                Some(source) => {
1327                    source
1328                        .read_replica_db_instance_identifiers
1329                        .push(db_instance_identifier.clone());
1330                    state.finish_instance_creation(replica.clone());
1331                    false
1332                }
1333                None => {
1334                    state.cancel_instance_creation(&db_instance_identifier);
1335                    true
1336                }
1337            }
1338        };
1339
1340        if source_missing {
1341            runtime.stop_container(&db_instance_identifier).await;
1342            return Err(db_instance_not_found(&source_db_instance_identifier));
1343        }
1344
1345        Ok(AwsResponse::xml(
1346            StatusCode::OK,
1347            xml_wrap(
1348                "CreateDBInstanceReadReplica",
1349                &format!(
1350                    "<DBInstance>{}</DBInstance>",
1351                    db_instance_xml(&replica, None)
1352                ),
1353                &request.request_id,
1354            ),
1355        ))
1356    }
1357
1358    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1359        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1360        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1361        let subnet_ids = parse_subnet_ids(request)?;
1362
1363        if subnet_ids.is_empty() {
1364            return Err(AwsServiceError::aws_error(
1365                StatusCode::BAD_REQUEST,
1366                "InvalidParameterValue",
1367                "At least one subnet must be specified.",
1368            ));
1369        }
1370
1371        if subnet_ids.len() < 2 {
1372            return Err(AwsServiceError::aws_error(
1373                StatusCode::BAD_REQUEST,
1374                "DBSubnetGroupDoesNotCoverEnoughAZs",
1375                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1376            ));
1377        }
1378
1379        let mut state = self.state.write();
1380
1381        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1382            return Err(AwsServiceError::aws_error(
1383                StatusCode::CONFLICT,
1384                "DBSubnetGroupAlreadyExists",
1385                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1386            ));
1387        }
1388
1389        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1390        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1391            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1392            .collect();
1393
1394        // Validate that subnets span at least 2 unique Availability Zones
1395        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1396        if unique_azs.len() < 2 {
1397            return Err(AwsServiceError::aws_error(
1398                StatusCode::BAD_REQUEST,
1399                "DBSubnetGroupDoesNotCoverEnoughAZs",
1400                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1401            ));
1402        }
1403
1404        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1405        let tags = parse_tags(request)?;
1406
1407        let subnet_group = DbSubnetGroup {
1408            db_subnet_group_name: db_subnet_group_name.clone(),
1409            db_subnet_group_arn,
1410            db_subnet_group_description,
1411            vpc_id,
1412            subnet_ids,
1413            subnet_availability_zones,
1414            tags,
1415        };
1416
1417        state
1418            .subnet_groups
1419            .insert(db_subnet_group_name, subnet_group.clone());
1420
1421        Ok(AwsResponse::xml(
1422            StatusCode::OK,
1423            xml_wrap(
1424                "CreateDBSubnetGroup",
1425                &format!(
1426                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1427                    db_subnet_group_xml(&subnet_group)
1428                ),
1429                &request.request_id,
1430            ),
1431        ))
1432    }
1433
1434    fn describe_db_subnet_groups(
1435        &self,
1436        request: &AwsRequest,
1437    ) -> Result<AwsResponse, AwsServiceError> {
1438        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1439        let marker = optional_param(request, "Marker");
1440        let max_records = optional_param(request, "MaxRecords");
1441
1442        let state = self.state.read();
1443
1444        // If specific subnet group requested, return just that one (no pagination)
1445        if let Some(name) = db_subnet_group_name {
1446            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1447                AwsServiceError::aws_error(
1448                    StatusCode::NOT_FOUND,
1449                    "DBSubnetGroupNotFoundFault",
1450                    format!("DBSubnetGroup {} not found.", name),
1451                )
1452            })?;
1453
1454            return Ok(AwsResponse::xml(
1455                StatusCode::OK,
1456                xml_wrap(
1457                    "DescribeDBSubnetGroups",
1458                    &format!(
1459                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1460                        db_subnet_group_xml(sg)
1461                    ),
1462                    &request.request_id,
1463                ),
1464            ));
1465        }
1466
1467        // Get all subnet groups sorted by name
1468        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1469        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1470
1471        // Apply pagination
1472        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1473            &sg.db_subnet_group_name
1474        })?;
1475
1476        let marker_xml = paginated
1477            .next_marker
1478            .as_ref()
1479            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1480            .unwrap_or_default();
1481
1482        let body = paginated
1483            .items
1484            .iter()
1485            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1486            .collect::<Vec<_>>()
1487            .join("");
1488
1489        Ok(AwsResponse::xml(
1490            StatusCode::OK,
1491            xml_wrap(
1492                "DescribeDBSubnetGroups",
1493                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1494                &request.request_id,
1495            ),
1496        ))
1497    }
1498
1499    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1500        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1501
1502        let mut state = self.state.write();
1503
1504        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1505            return Err(AwsServiceError::aws_error(
1506                StatusCode::NOT_FOUND,
1507                "DBSubnetGroupNotFoundFault",
1508                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1509            ));
1510        }
1511
1512        Ok(AwsResponse::xml(
1513            StatusCode::OK,
1514            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1515        ))
1516    }
1517
1518    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1519        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1520        let subnet_ids = parse_subnet_ids(request)?;
1521
1522        if subnet_ids.is_empty() {
1523            return Err(AwsServiceError::aws_error(
1524                StatusCode::BAD_REQUEST,
1525                "InvalidParameterValue",
1526                "At least one subnet must be specified.",
1527            ));
1528        }
1529
1530        if subnet_ids.len() < 2 {
1531            return Err(AwsServiceError::aws_error(
1532                StatusCode::BAD_REQUEST,
1533                "DBSubnetGroupDoesNotCoverEnoughAZs",
1534                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1535            ));
1536        }
1537
1538        let mut state = self.state.write();
1539
1540        let region = state.region.clone();
1541
1542        let subnet_group = state
1543            .subnet_groups
1544            .get_mut(&db_subnet_group_name)
1545            .ok_or_else(|| {
1546                AwsServiceError::aws_error(
1547                    StatusCode::NOT_FOUND,
1548                    "DBSubnetGroupNotFoundFault",
1549                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1550                )
1551            })?;
1552
1553        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1554            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1555            .collect();
1556
1557        // Validate that subnets span at least 2 unique Availability Zones
1558        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1559        if unique_azs.len() < 2 {
1560            return Err(AwsServiceError::aws_error(
1561                StatusCode::BAD_REQUEST,
1562                "DBSubnetGroupDoesNotCoverEnoughAZs",
1563                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1564            ));
1565        }
1566
1567        subnet_group.subnet_ids = subnet_ids;
1568        subnet_group.subnet_availability_zones = subnet_availability_zones;
1569
1570        let subnet_group_clone = subnet_group.clone();
1571
1572        Ok(AwsResponse::xml(
1573            StatusCode::OK,
1574            xml_wrap(
1575                "ModifyDBSubnetGroup",
1576                &format!(
1577                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1578                    db_subnet_group_xml(&subnet_group_clone)
1579                ),
1580                &request.request_id,
1581            ),
1582        ))
1583    }
1584
1585    fn create_db_parameter_group(
1586        &self,
1587        request: &AwsRequest,
1588    ) -> Result<AwsResponse, AwsServiceError> {
1589        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1590        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1591        let description = required_param(request, "Description")?;
1592
1593        // Validate parameter group family against supported engines and versions
1594        let valid_families = [
1595            "postgres16",
1596            "postgres15",
1597            "postgres14",
1598            "postgres13",
1599            "mysql8.0",
1600            "mysql5.7",
1601            "mariadb10.11",
1602            "mariadb10.6",
1603        ];
1604
1605        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1606            return Err(AwsServiceError::aws_error(
1607                StatusCode::BAD_REQUEST,
1608                "InvalidParameterValue",
1609                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1610            ));
1611        }
1612
1613        let mut state = self.state.write();
1614
1615        if state
1616            .parameter_groups
1617            .contains_key(&db_parameter_group_name)
1618        {
1619            return Err(AwsServiceError::aws_error(
1620                StatusCode::CONFLICT,
1621                "DBParameterGroupAlreadyExists",
1622                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1623            ));
1624        }
1625
1626        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1627        let tags = parse_tags(request)?;
1628
1629        let parameter_group = DbParameterGroup {
1630            db_parameter_group_name: db_parameter_group_name.clone(),
1631            db_parameter_group_arn,
1632            db_parameter_group_family,
1633            description,
1634            parameters: std::collections::HashMap::new(),
1635            tags,
1636        };
1637
1638        state
1639            .parameter_groups
1640            .insert(db_parameter_group_name, parameter_group.clone());
1641
1642        Ok(AwsResponse::xml(
1643            StatusCode::OK,
1644            xml_wrap(
1645                "CreateDBParameterGroup",
1646                &format!(
1647                    "<DBParameterGroup>{}</DBParameterGroup>",
1648                    db_parameter_group_xml(&parameter_group)
1649                ),
1650                &request.request_id,
1651            ),
1652        ))
1653    }
1654
1655    fn describe_db_parameter_groups(
1656        &self,
1657        request: &AwsRequest,
1658    ) -> Result<AwsResponse, AwsServiceError> {
1659        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1660        let marker = optional_param(request, "Marker");
1661        let max_records = optional_param(request, "MaxRecords");
1662
1663        let state = self.state.read();
1664
1665        // If specific parameter group requested, return just that one (no pagination)
1666        if let Some(name) = db_parameter_group_name {
1667            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1668                AwsServiceError::aws_error(
1669                    StatusCode::NOT_FOUND,
1670                    "DBParameterGroupNotFound",
1671                    format!("DBParameterGroup {} not found.", name),
1672                )
1673            })?;
1674
1675            return Ok(AwsResponse::xml(
1676                StatusCode::OK,
1677                xml_wrap(
1678                    "DescribeDBParameterGroups",
1679                    &format!(
1680                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1681                        db_parameter_group_xml(pg)
1682                    ),
1683                    &request.request_id,
1684                ),
1685            ));
1686        }
1687
1688        // Get all parameter groups sorted by name
1689        let mut parameter_groups: Vec<DbParameterGroup> =
1690            state.parameter_groups.values().cloned().collect();
1691        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1692
1693        // Apply pagination
1694        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1695            &pg.db_parameter_group_name
1696        })?;
1697
1698        let marker_xml = paginated
1699            .next_marker
1700            .as_ref()
1701            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1702            .unwrap_or_default();
1703
1704        let body = paginated
1705            .items
1706            .iter()
1707            .map(|pg| {
1708                format!(
1709                    "<DBParameterGroup>{}</DBParameterGroup>",
1710                    db_parameter_group_xml(pg)
1711                )
1712            })
1713            .collect::<Vec<_>>()
1714            .join("");
1715
1716        Ok(AwsResponse::xml(
1717            StatusCode::OK,
1718            xml_wrap(
1719                "DescribeDBParameterGroups",
1720                &format!(
1721                    "<DBParameterGroups>{}</DBParameterGroups>{}",
1722                    body, marker_xml
1723                ),
1724                &request.request_id,
1725            ),
1726        ))
1727    }
1728
1729    fn delete_db_parameter_group(
1730        &self,
1731        request: &AwsRequest,
1732    ) -> Result<AwsResponse, AwsServiceError> {
1733        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1734
1735        let mut state = self.state.write();
1736
1737        if db_parameter_group_name.starts_with("default.") {
1738            return Err(AwsServiceError::aws_error(
1739                StatusCode::BAD_REQUEST,
1740                "InvalidParameterValue",
1741                "Cannot delete default parameter groups.",
1742            ));
1743        }
1744
1745        if state
1746            .parameter_groups
1747            .remove(&db_parameter_group_name)
1748            .is_none()
1749        {
1750            return Err(AwsServiceError::aws_error(
1751                StatusCode::NOT_FOUND,
1752                "DBParameterGroupNotFound",
1753                format!("DBParameterGroup {db_parameter_group_name} not found."),
1754            ));
1755        }
1756
1757        Ok(AwsResponse::xml(
1758            StatusCode::OK,
1759            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1760        ))
1761    }
1762
1763    fn modify_db_parameter_group(
1764        &self,
1765        request: &AwsRequest,
1766    ) -> Result<AwsResponse, AwsServiceError> {
1767        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1768
1769        let mut state = self.state.write();
1770
1771        let parameter_group = state
1772            .parameter_groups
1773            .get_mut(&db_parameter_group_name)
1774            .ok_or_else(|| {
1775                AwsServiceError::aws_error(
1776                    StatusCode::NOT_FOUND,
1777                    "DBParameterGroupNotFound",
1778                    format!("DBParameterGroup {db_parameter_group_name} not found."),
1779                )
1780            })?;
1781
1782        if let Some(new_description) = optional_param(request, "Description") {
1783            parameter_group.description = new_description;
1784        }
1785
1786        let parameter_group_clone = parameter_group.clone();
1787
1788        Ok(AwsResponse::xml(
1789            StatusCode::OK,
1790            xml_wrap(
1791                "ModifyDBParameterGroup",
1792                &format!(
1793                    "<DBParameterGroupName>{}</DBParameterGroupName>",
1794                    xml_escape(&parameter_group_clone.db_parameter_group_name)
1795                ),
1796                &request.request_id,
1797            ),
1798        ))
1799    }
1800}
1801
1802fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1803    req.query_params
1804        .get(name)
1805        .cloned()
1806        .filter(|value| !value.is_empty())
1807}
1808
1809fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1810    optional_param(req, name).ok_or_else(|| {
1811        AwsServiceError::aws_error(
1812            StatusCode::BAD_REQUEST,
1813            "MissingParameter",
1814            format!("The request must contain the parameter {name}."),
1815        )
1816    })
1817}
1818
1819fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1820    let value = required_param(req, name)?;
1821    value.parse::<i32>().map_err(|_| {
1822        AwsServiceError::aws_error(
1823            StatusCode::BAD_REQUEST,
1824            "InvalidParameterValue",
1825            format!("Parameter {name} must be a valid integer."),
1826        )
1827    })
1828}
1829
1830fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1831    optional_param(req, name)
1832        .map(|value| {
1833            value.parse::<i32>().map_err(|_| {
1834                AwsServiceError::aws_error(
1835                    StatusCode::BAD_REQUEST,
1836                    "InvalidParameterValue",
1837                    format!("Parameter {name} must be a valid integer."),
1838                )
1839            })
1840        })
1841        .transpose()
1842}
1843
1844fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1845    let mut tags = Vec::new();
1846    for index in 1.. {
1847        let key_name = format!("Tags.Tag.{index}.Key");
1848        let value_name = format!("Tags.Tag.{index}.Value");
1849        let key = optional_param(req, &key_name);
1850        let value = optional_param(req, &value_name);
1851
1852        match (key, value) {
1853            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1854            (None, None) => break,
1855            _ => {
1856                return Err(AwsServiceError::aws_error(
1857                    StatusCode::BAD_REQUEST,
1858                    "InvalidParameterValue",
1859                    "Each tag must include both Key and Value.",
1860                ));
1861            }
1862        }
1863    }
1864
1865    Ok(tags)
1866}
1867
1868fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1869    let mut keys = Vec::new();
1870    for index in 1.. {
1871        let key_name = format!("TagKeys.member.{index}");
1872        match optional_param(req, &key_name) {
1873            Some(key) => keys.push(key),
1874            None => break,
1875        }
1876    }
1877
1878    Ok(keys)
1879}
1880
1881fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1882    let mut subnet_ids = Vec::new();
1883    for index in 1.. {
1884        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1885        match optional_param(req, &subnet_id_name) {
1886            Some(subnet_id) => subnet_ids.push(subnet_id),
1887            None => break,
1888        }
1889    }
1890
1891    Ok(subnet_ids)
1892}
1893
1894fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1895    let mut security_group_ids = Vec::new();
1896    for index in 1.. {
1897        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1898        match optional_param(req, &sg_id_name) {
1899            Some(sg_id) => security_group_ids.push(sg_id),
1900            None => break,
1901        }
1902    }
1903
1904    // If no security groups provided, return a default one
1905    if security_group_ids.is_empty() {
1906        security_group_ids.push("sg-default".to_string());
1907    }
1908
1909    security_group_ids
1910}
1911
1912fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1913    req.query_params.keys().any(|key| key.starts_with(prefix))
1914}
1915
1916fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1917    value
1918        .map(|raw| match raw {
1919            "true" | "True" | "TRUE" => Ok(true),
1920            "false" | "False" | "FALSE" => Ok(false),
1921            _ => Err(AwsServiceError::aws_error(
1922                StatusCode::BAD_REQUEST,
1923                "InvalidParameterValue",
1924                format!("Boolean parameter value '{raw}' is invalid."),
1925            )),
1926        })
1927        .transpose()
1928}
1929
1930struct PaginationResult<T> {
1931    items: Vec<T>,
1932    next_marker: Option<String>,
1933}
1934
1935fn paginate<T, F>(
1936    mut items: Vec<T>,
1937    marker: Option<String>,
1938    max_records: Option<String>,
1939    get_id: F,
1940) -> Result<PaginationResult<T>, AwsServiceError>
1941where
1942    F: Fn(&T) -> &str,
1943{
1944    // Parse max_records with default 100, max 100
1945    let max = if let Some(max_str) = max_records {
1946        let parsed = max_str.parse::<i32>().map_err(|_| {
1947            AwsServiceError::aws_error(
1948                StatusCode::BAD_REQUEST,
1949                "InvalidParameterValue",
1950                "MaxRecords must be a valid integer.",
1951            )
1952        })?;
1953        if !(1..=100).contains(&parsed) {
1954            return Err(AwsServiceError::aws_error(
1955                StatusCode::BAD_REQUEST,
1956                "InvalidParameterValue",
1957                "MaxRecords must be between 1 and 100.",
1958            ));
1959        }
1960        parsed as usize
1961    } else {
1962        100
1963    };
1964
1965    // Decode marker to get starting identifier
1966    let start_id = if let Some(encoded_marker) = marker {
1967        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
1968            AwsServiceError::aws_error(
1969                StatusCode::BAD_REQUEST,
1970                "InvalidParameterValue",
1971                "Marker is invalid.",
1972            )
1973        })?;
1974        let id = String::from_utf8(decoded).map_err(|_| {
1975            AwsServiceError::aws_error(
1976                StatusCode::BAD_REQUEST,
1977                "InvalidParameterValue",
1978                "Marker is invalid.",
1979            )
1980        })?;
1981        Some(id)
1982    } else {
1983        None
1984    };
1985
1986    // Find starting position
1987    let start_index = if let Some(ref start_id) = start_id {
1988        items
1989            .iter()
1990            .position(|item| get_id(item) == start_id)
1991            .map(|pos| pos + 1) // Start after the marker
1992            .unwrap_or(items.len()) // If not found, return empty result
1993    } else {
1994        0
1995    };
1996
1997    // Take items from start_index
1998    let total_items = items.len();
1999    let end_index = std::cmp::min(start_index + max, total_items);
2000    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2001
2002    // Create next marker if there are more items
2003    let next_marker = if end_index < total_items {
2004        paginated_items
2005            .last()
2006            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2007    } else {
2008        None
2009    };
2010
2011    Ok(PaginationResult {
2012        items: paginated_items,
2013        next_marker,
2014    })
2015}
2016
2017fn validate_create_request(
2018    db_instance_identifier: &str,
2019    allocated_storage: i32,
2020    db_instance_class: &str,
2021    engine: &str,
2022    engine_version: &str,
2023    port: i32,
2024) -> Result<(), AwsServiceError> {
2025    if allocated_storage <= 0 {
2026        return Err(AwsServiceError::aws_error(
2027            StatusCode::BAD_REQUEST,
2028            "InvalidParameterValue",
2029            "AllocatedStorage must be greater than zero.",
2030        ));
2031    }
2032    if port <= 0 {
2033        return Err(AwsServiceError::aws_error(
2034            StatusCode::BAD_REQUEST,
2035            "InvalidParameterValue",
2036            "Port must be greater than zero.",
2037        ));
2038    }
2039    if !db_instance_identifier
2040        .chars()
2041        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2042    {
2043        return Err(AwsServiceError::aws_error(
2044            StatusCode::BAD_REQUEST,
2045            "InvalidParameterValue",
2046            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2047        ));
2048    }
2049    // Validate engine
2050    let supported_engines = ["postgres", "mysql", "mariadb"];
2051    if !supported_engines.contains(&engine) {
2052        return Err(AwsServiceError::aws_error(
2053            StatusCode::BAD_REQUEST,
2054            "InvalidParameterValue",
2055            format!("Engine '{}' is not supported.", engine),
2056        ));
2057    }
2058
2059    // Validate engine version
2060    let supported_versions = match engine {
2061        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2062        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2063        "mariadb" => vec!["10.11.6", "10.6.16"],
2064        _ => vec![],
2065    };
2066
2067    if !supported_versions.contains(&engine_version) {
2068        return Err(AwsServiceError::aws_error(
2069            StatusCode::BAD_REQUEST,
2070            "InvalidParameterValue",
2071            format!("EngineVersion '{engine_version}' is not supported yet."),
2072        ));
2073    }
2074    validate_db_instance_class(db_instance_class)?;
2075    Ok(())
2076}
2077
2078fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2079    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2080        return Err(AwsServiceError::aws_error(
2081            StatusCode::BAD_REQUEST,
2082            "InvalidParameterValue",
2083            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2084        ));
2085    }
2086    Ok(())
2087}
2088
2089fn filter_engine_versions(
2090    versions: &[EngineVersionInfo],
2091    engine: &Option<String>,
2092    engine_version: &Option<String>,
2093    family: &Option<String>,
2094) -> Vec<EngineVersionInfo> {
2095    versions
2096        .iter()
2097        .filter(|candidate| {
2098            engine
2099                .as_ref()
2100                .is_none_or(|expected| candidate.engine == *expected)
2101        })
2102        .filter(|candidate| {
2103            engine_version
2104                .as_ref()
2105                .is_none_or(|expected| candidate.engine_version == *expected)
2106        })
2107        .filter(|candidate| {
2108            family
2109                .as_ref()
2110                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2111        })
2112        .cloned()
2113        .collect()
2114}
2115
2116fn filter_orderable_options(
2117    options: &[OrderableDbInstanceOption],
2118    engine: &Option<String>,
2119    engine_version: &Option<String>,
2120    db_instance_class: &Option<String>,
2121    license_model: &Option<String>,
2122    vpc: Option<bool>,
2123) -> Vec<OrderableDbInstanceOption> {
2124    options
2125        .iter()
2126        .filter(|candidate| {
2127            engine
2128                .as_ref()
2129                .is_none_or(|expected| candidate.engine == *expected)
2130        })
2131        .filter(|candidate| {
2132            engine_version
2133                .as_ref()
2134                .is_none_or(|expected| candidate.engine_version == *expected)
2135        })
2136        .filter(|candidate| {
2137            db_instance_class
2138                .as_ref()
2139                .is_none_or(|expected| candidate.db_instance_class == *expected)
2140        })
2141        .filter(|candidate| {
2142            license_model
2143                .as_ref()
2144                .is_none_or(|expected| candidate.license_model == *expected)
2145        })
2146        .filter(|_| vpc.unwrap_or(true))
2147        .cloned()
2148        .collect()
2149}
2150
2151fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2152    format!(
2153        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
2154         <{action}Response xmlns=\"{RDS_NS}\">\
2155         <{action}Result>{inner}</{action}Result>\
2156         <ResponseMetadata><RequestId>{request_id}</RequestId></ResponseMetadata>\
2157         </{action}Response>"
2158    )
2159}
2160
2161fn engine_version_xml(version: &EngineVersionInfo) -> String {
2162    format!(
2163        "<DBEngineVersion>\
2164         <Engine>{}</Engine>\
2165         <EngineVersion>{}</EngineVersion>\
2166         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2167         <DBEngineDescription>{}</DBEngineDescription>\
2168         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2169         <Status>{}</Status>\
2170         </DBEngineVersion>",
2171        xml_escape(&version.engine),
2172        xml_escape(&version.engine_version),
2173        xml_escape(&version.db_parameter_group_family),
2174        xml_escape(&version.db_engine_description),
2175        xml_escape(&version.db_engine_version_description),
2176        xml_escape(&version.status),
2177    )
2178}
2179
2180fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2181    format!(
2182        "<OrderableDBInstanceOption>\
2183         <Engine>{}</Engine>\
2184         <EngineVersion>{}</EngineVersion>\
2185         <DBInstanceClass>{}</DBInstanceClass>\
2186         <LicenseModel>{}</LicenseModel>\
2187         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2188         <MultiAZCapable>true</MultiAZCapable>\
2189         <ReadReplicaCapable>true</ReadReplicaCapable>\
2190         <Vpc>true</Vpc>\
2191         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2192         <StorageType>{}</StorageType>\
2193         <SupportsIops>false</SupportsIops>\
2194         <MinStorageSize>{}</MinStorageSize>\
2195         <MaxStorageSize>{}</MaxStorageSize>\
2196         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2197         </OrderableDBInstanceOption>",
2198        xml_escape(&option.engine),
2199        xml_escape(&option.engine_version),
2200        xml_escape(&option.db_instance_class),
2201        xml_escape(&option.license_model),
2202        xml_escape(&option.storage_type),
2203        option.min_storage_size,
2204        option.max_storage_size,
2205    )
2206}
2207
2208fn tag_xml(tag: &RdsTag) -> String {
2209    format!(
2210        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2211        xml_escape(&tag.key),
2212        xml_escape(&tag.value),
2213    )
2214}
2215
2216fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2217    let status = status_override.unwrap_or(&instance.db_instance_status);
2218    let db_name_xml = instance
2219        .db_name
2220        .as_ref()
2221        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2222        .unwrap_or_default();
2223
2224    let read_replica_source_xml = instance
2225        .read_replica_source_db_instance_identifier
2226        .as_ref()
2227        .map(|source| {
2228            format!(
2229                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2230                xml_escape(source)
2231            )
2232        })
2233        .unwrap_or_default();
2234
2235    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2236        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2237    } else {
2238        format!(
2239            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2240            instance
2241                .read_replica_db_instance_identifiers
2242                .iter()
2243                .map(|id| format!(
2244                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2245                    xml_escape(id)
2246                ))
2247                .collect::<String>()
2248        )
2249    };
2250
2251    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2252        "<VpcSecurityGroups/>".to_string()
2253    } else {
2254        format!(
2255            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2256            instance
2257                .vpc_security_group_ids
2258                .iter()
2259                .map(|sg_id| format!(
2260                    "<VpcSecurityGroupMembership>\
2261                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2262                     <Status>active</Status>\
2263                     </VpcSecurityGroupMembership>",
2264                    xml_escape(sg_id)
2265                ))
2266                .collect::<String>()
2267        )
2268    };
2269
2270    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2271        Some(pg_name) => format!(
2272            "<DBParameterGroups>\
2273             <DBParameterGroup>\
2274             <DBParameterGroupName>{}</DBParameterGroupName>\
2275             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2276             </DBParameterGroup>\
2277             </DBParameterGroups>",
2278            xml_escape(pg_name)
2279        ),
2280        None => "<DBParameterGroups/>".to_string(),
2281    };
2282
2283    let option_group_memberships_xml = match &instance.option_group_name {
2284        Some(og_name) => format!(
2285            "<OptionGroupMemberships>\
2286             <OptionGroupMembership>\
2287             <OptionGroupName>{}</OptionGroupName>\
2288             <Status>in-sync</Status>\
2289             </OptionGroupMembership>\
2290             </OptionGroupMemberships>",
2291            xml_escape(og_name)
2292        ),
2293        None => "<OptionGroupMemberships/>".to_string(),
2294    };
2295
2296    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2297        let mut fields = Vec::new();
2298        if let Some(ref class) = pending.db_instance_class {
2299            fields.push(format!(
2300                "<DBInstanceClass>{}</DBInstanceClass>",
2301                xml_escape(class)
2302            ));
2303        }
2304        if let Some(allocated_storage) = pending.allocated_storage {
2305            fields.push(format!(
2306                "<AllocatedStorage>{}</AllocatedStorage>",
2307                allocated_storage
2308            ));
2309        }
2310        if let Some(backup_retention_period) = pending.backup_retention_period {
2311            fields.push(format!(
2312                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2313                backup_retention_period
2314            ));
2315        }
2316        if let Some(multi_az) = pending.multi_az {
2317            fields.push(format!(
2318                "<MultiAZ>{}</MultiAZ>",
2319                if multi_az { "true" } else { "false" }
2320            ));
2321        }
2322        if let Some(ref engine_version) = pending.engine_version {
2323            fields.push(format!(
2324                "<EngineVersion>{}</EngineVersion>",
2325                xml_escape(engine_version)
2326            ));
2327        }
2328        if pending.master_user_password.is_some() {
2329            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2330        }
2331        if !fields.is_empty() {
2332            format!(
2333                "<PendingModifiedValues>{}</PendingModifiedValues>",
2334                fields.join("")
2335            )
2336        } else {
2337            String::new()
2338        }
2339    } else {
2340        String::new()
2341    };
2342
2343    format!(
2344        "<DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2345         <DBInstanceClass>{}</DBInstanceClass>\
2346         <Engine>{}</Engine>\
2347         <DBInstanceStatus>{}</DBInstanceStatus>\
2348         <MasterUsername>{}</MasterUsername>\
2349         {}\
2350         <Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>\
2351         <AllocatedStorage>{}</AllocatedStorage>\
2352         <InstanceCreateTime>{}</InstanceCreateTime>\
2353         <PreferredBackupWindow>{}</PreferredBackupWindow>\
2354         <BackupRetentionPeriod>{}</BackupRetentionPeriod>\
2355         <DBSecurityGroups/>\
2356         {}\
2357         {}\
2358         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2359         {}\
2360         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2361         <MultiAZ>{}</MultiAZ>\
2362         <EngineVersion>{}</EngineVersion>\
2363         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2364         {}\
2365         {}\
2366         <LicenseModel>{}</LicenseModel>\
2367         {}\
2368         <PubliclyAccessible>{}</PubliclyAccessible>\
2369         <StorageType>gp2</StorageType>\
2370         <DbInstancePort>{}</DbInstancePort>\
2371         <StorageEncrypted>false</StorageEncrypted>\
2372         <DbiResourceId>{}</DbiResourceId>\
2373         <DeletionProtection>{}</DeletionProtection>\
2374         {}\
2375         <DBInstanceArn>{}</DBInstanceArn>",
2376        xml_escape(&instance.db_instance_identifier),
2377        xml_escape(&instance.db_instance_class),
2378        xml_escape(&instance.engine),
2379        xml_escape(status),
2380        xml_escape(&instance.master_username),
2381        db_name_xml,
2382        xml_escape(&instance.endpoint_address),
2383        instance.port,
2384        instance.allocated_storage,
2385        instance.created_at.to_rfc3339(),
2386        xml_escape(&instance.preferred_backup_window),
2387        instance.backup_retention_period,
2388        vpc_security_groups_xml,
2389        db_parameter_groups_xml,
2390        instance
2391            .latest_restorable_time
2392            .map(|t| format!(
2393                "<LatestRestorableTime>{}</LatestRestorableTime>",
2394                t.to_rfc3339()
2395            ))
2396            .unwrap_or_default(),
2397        if instance.multi_az { "true" } else { "false" },
2398        xml_escape(&instance.engine_version),
2399        read_replica_identifiers_xml,
2400        read_replica_source_xml,
2401        license_model_for_engine(&instance.engine),
2402        option_group_memberships_xml,
2403        if instance.publicly_accessible {
2404            "true"
2405        } else {
2406            "false"
2407        },
2408        instance.port,
2409        xml_escape(&instance.dbi_resource_id),
2410        if instance.deletion_protection {
2411            "true"
2412        } else {
2413            "false"
2414        },
2415        pending_modified_values_xml,
2416        xml_escape(&instance.db_instance_arn),
2417    )
2418}
2419
2420fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2421    format!(
2422        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2423         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2424         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2425         <Engine>{}</Engine>\
2426         <EngineVersion>{}</EngineVersion>\
2427         <AllocatedStorage>{}</AllocatedStorage>\
2428         <Status>{}</Status>\
2429         <Port>{}</Port>\
2430         <MasterUsername>{}</MasterUsername>\
2431         {}\
2432         <DbiResourceId>{}</DbiResourceId>\
2433         <SnapshotType>{}</SnapshotType>\
2434         <DBSnapshotArn>{}</DBSnapshotArn>",
2435        xml_escape(&snapshot.db_snapshot_identifier),
2436        xml_escape(&snapshot.db_instance_identifier),
2437        snapshot.snapshot_create_time.to_rfc3339(),
2438        xml_escape(&snapshot.engine),
2439        xml_escape(&snapshot.engine_version),
2440        snapshot.allocated_storage,
2441        xml_escape(&snapshot.status),
2442        snapshot.port,
2443        xml_escape(&snapshot.master_username),
2444        snapshot
2445            .db_name
2446            .as_ref()
2447            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2448            .unwrap_or_default(),
2449        xml_escape(&snapshot.dbi_resource_id),
2450        xml_escape(&snapshot.snapshot_type),
2451        xml_escape(&snapshot.db_snapshot_arn),
2452    )
2453}
2454
2455fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2456    let subnets_xml = subnet_group
2457        .subnet_ids
2458        .iter()
2459        .zip(&subnet_group.subnet_availability_zones)
2460        .map(|(subnet_id, az)| {
2461            format!(
2462                "<Subnet>\
2463                 <SubnetIdentifier>{}</SubnetIdentifier>\
2464                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2465                 <SubnetStatus>Active</SubnetStatus>\
2466                 </Subnet>",
2467                xml_escape(subnet_id),
2468                xml_escape(az)
2469            )
2470        })
2471        .collect::<String>();
2472
2473    format!(
2474        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2475         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2476         <VpcId>{}</VpcId>\
2477         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2478         <Subnets>{}</Subnets>\
2479         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2480        xml_escape(&subnet_group.db_subnet_group_name),
2481        xml_escape(&subnet_group.db_subnet_group_description),
2482        xml_escape(&subnet_group.vpc_id),
2483        subnets_xml,
2484        xml_escape(&subnet_group.db_subnet_group_arn),
2485    )
2486}
2487
2488fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2489    format!(
2490        "<DBParameterGroupName>{}</DBParameterGroupName>\
2491         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2492         <Description>{}</Description>\
2493         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2494        xml_escape(&parameter_group.db_parameter_group_name),
2495        xml_escape(&parameter_group.db_parameter_group_family),
2496        xml_escape(&parameter_group.description),
2497        xml_escape(&parameter_group.db_parameter_group_arn),
2498    )
2499}
2500
2501fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2502    AwsServiceError::aws_error(
2503        StatusCode::NOT_FOUND,
2504        "DBInstanceNotFound",
2505        format!("DBInstance {} not found.", identifier),
2506    )
2507}
2508
2509fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2510    AwsServiceError::aws_error(
2511        StatusCode::NOT_FOUND,
2512        "DBSnapshotNotFound",
2513        format!("DBSnapshot {} not found.", identifier),
2514    )
2515}
2516
2517fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2518    AwsServiceError::aws_error(
2519        StatusCode::NOT_FOUND,
2520        "DBInstanceNotFound",
2521        format!("DBInstance {resource_name} not found."),
2522    )
2523}
2524
2525fn find_instance_by_arn<'a>(
2526    state: &'a crate::state::RdsState,
2527    resource_name: &str,
2528) -> Result<&'a DbInstance, AwsServiceError> {
2529    state
2530        .instances
2531        .values()
2532        .find(|instance| instance.db_instance_arn == resource_name)
2533        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2534}
2535
2536fn find_instance_by_arn_mut<'a>(
2537    state: &'a mut crate::state::RdsState,
2538    resource_name: &str,
2539) -> Result<&'a mut DbInstance, AwsServiceError> {
2540    state
2541        .instances
2542        .values_mut()
2543        .find(|instance| instance.db_instance_arn == resource_name)
2544        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2545}
2546
2547fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2548    for tag in incoming {
2549        if let Some(existing_tag) = existing
2550            .iter_mut()
2551            .find(|candidate| candidate.key == tag.key)
2552        {
2553            existing_tag.value = tag.value.clone();
2554        } else {
2555            existing.push(tag.clone());
2556        }
2557    }
2558}
2559
2560fn license_model_for_engine(engine: &str) -> &'static str {
2561    match engine {
2562        "mysql" | "mariadb" => "general-public-license",
2563        _ => "postgresql-license",
2564    }
2565}
2566
2567fn default_db_name(engine: &str) -> &'static str {
2568    match engine {
2569        "mysql" | "mariadb" => "mysql",
2570        _ => "postgres",
2571    }
2572}
2573
2574fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2575    match error {
2576        RuntimeError::Unavailable => AwsServiceError::aws_error(
2577            StatusCode::SERVICE_UNAVAILABLE,
2578            "InvalidParameterValue",
2579            "Docker/Podman is required for RDS DB instances but is not available",
2580        ),
2581        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2582            StatusCode::INTERNAL_SERVER_ERROR,
2583            "InternalFailure",
2584            message,
2585        ),
2586    }
2587}
2588
2589#[cfg(test)]
2590mod tests {
2591    use std::collections::HashMap;
2592    use std::sync::Arc;
2593
2594    use bytes::Bytes;
2595    use chrono::Utc;
2596    use http::{HeaderMap, Method};
2597    use parking_lot::RwLock;
2598    use uuid::Uuid;
2599
2600    use super::{
2601        db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2602        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2603    };
2604    use crate::state::{
2605        default_engine_versions, default_orderable_options, DbInstance, RdsState, RdsTag,
2606    };
2607    use fakecloud_core::service::{AwsRequest, AwsService};
2608
2609    #[test]
2610    fn filter_engine_versions_matches_requested_engine() {
2611        let versions = default_engine_versions();
2612
2613        let filtered =
2614            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2615
2616        assert_eq!(filtered.len(), 4); // All postgres versions
2617        assert!(filtered.iter().all(|v| v.engine == "postgres"));
2618    }
2619
2620    #[test]
2621    fn filter_orderable_options_respects_instance_class() {
2622        let options = default_orderable_options();
2623
2624        let filtered = filter_orderable_options(
2625            &options,
2626            &Some("postgres".to_string()),
2627            &Some("16.3".to_string()),
2628            &Some("db.t3.micro".to_string()),
2629            &None,
2630            Some(true),
2631        );
2632
2633        assert_eq!(filtered.len(), 1);
2634        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2635    }
2636
2637    #[test]
2638    fn validate_create_request_rejects_unsupported_engine() {
2639        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2640            .expect_err("unsupported engine");
2641
2642        assert_eq!(error.code(), "InvalidParameterValue");
2643    }
2644
2645    #[test]
2646    fn optional_i32_param_rejects_invalid_integer() {
2647        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2648
2649        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2650
2651        assert_eq!(error.code(), "InvalidParameterValue");
2652    }
2653
2654    #[test]
2655    fn db_instance_xml_renders_endpoint_and_status() {
2656        let created_at = Utc::now();
2657        let instance = DbInstance {
2658            db_instance_identifier: "test-db".to_string(),
2659            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2660            db_instance_class: "db.t3.micro".to_string(),
2661            engine: "postgres".to_string(),
2662            engine_version: "16.3".to_string(),
2663            db_instance_status: "available".to_string(),
2664            master_username: "admin".to_string(),
2665            db_name: Some("appdb".to_string()),
2666            endpoint_address: "127.0.0.1".to_string(),
2667            port: 15432,
2668            allocated_storage: 20,
2669            publicly_accessible: true,
2670            deletion_protection: false,
2671            created_at,
2672            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2673            master_user_password: "secret123".to_string(),
2674            container_id: "container".to_string(),
2675            host_port: 15432,
2676            tags: Vec::new(),
2677            read_replica_source_db_instance_identifier: None,
2678            read_replica_db_instance_identifiers: Vec::new(),
2679            vpc_security_group_ids: vec!["sg-12345678".to_string()],
2680            db_parameter_group_name: Some("default.postgres16".to_string()),
2681            backup_retention_period: 1,
2682            preferred_backup_window: "03:00-04:00".to_string(),
2683            latest_restorable_time: Some(created_at),
2684            option_group_name: None,
2685            multi_az: false,
2686            pending_modified_values: None,
2687        };
2688
2689        let xml = db_instance_xml(&instance, Some("creating"));
2690
2691        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2692        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2693        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2694    }
2695
2696    #[test]
2697    fn parse_tags_reads_rds_query_shape() {
2698        let request = request(
2699            "AddTagsToResource",
2700            &[
2701                ("Tags.Tag.1.Key", "env"),
2702                ("Tags.Tag.1.Value", "dev"),
2703                ("Tags.Tag.2.Key", "team"),
2704                ("Tags.Tag.2.Value", "core"),
2705            ],
2706        );
2707
2708        let tags = parse_tags(&request).expect("tags");
2709
2710        assert_eq!(
2711            tags,
2712            vec![
2713                RdsTag {
2714                    key: "env".to_string(),
2715                    value: "dev".to_string(),
2716                },
2717                RdsTag {
2718                    key: "team".to_string(),
2719                    value: "core".to_string(),
2720                }
2721            ]
2722        );
2723    }
2724
2725    #[test]
2726    fn parse_tag_keys_reads_member_shape() {
2727        let request = request(
2728            "RemoveTagsFromResource",
2729            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2730        );
2731
2732        let tag_keys = parse_tag_keys(&request).expect("tag keys");
2733
2734        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2735    }
2736
2737    #[test]
2738    fn merge_tags_updates_existing_values() {
2739        let mut tags = vec![RdsTag {
2740            key: "env".to_string(),
2741            value: "dev".to_string(),
2742        }];
2743
2744        merge_tags(
2745            &mut tags,
2746            &[
2747                RdsTag {
2748                    key: "env".to_string(),
2749                    value: "prod".to_string(),
2750                },
2751                RdsTag {
2752                    key: "team".to_string(),
2753                    value: "core".to_string(),
2754                },
2755            ],
2756        );
2757
2758        assert_eq!(tags.len(), 2);
2759        assert_eq!(tags[0].value, "prod");
2760        assert_eq!(tags[1].key, "team");
2761    }
2762
2763    #[tokio::test]
2764    async fn describe_engine_versions_returns_xml_body() {
2765        let service = RdsService::new(Arc::new(RwLock::new(RdsState::new(
2766            "123456789012",
2767            "us-east-1",
2768        ))));
2769        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2770
2771        let response = service.handle(request).await.expect("response");
2772        let body = String::from_utf8(response.body.to_vec()).expect("utf8");
2773
2774        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2775        assert!(body.contains("<Engine>postgres</Engine>"));
2776        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2777    }
2778
2779    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2780        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2781        for (key, value) in params {
2782            query_params.insert((*key).to_string(), (*value).to_string());
2783        }
2784
2785        AwsRequest {
2786            service: "rds".to_string(),
2787            action: action.to_string(),
2788            region: "us-east-1".to_string(),
2789            account_id: "123456789012".to_string(),
2790            request_id: "test-request-id".to_string(),
2791            headers: HeaderMap::new(),
2792            query_params,
2793            body: Bytes::new(),
2794            path_segments: vec![],
2795            raw_path: "/".to_string(),
2796            raw_query: String::new(),
2797            method: Method::POST,
2798            is_query_protocol: true,
2799            access_key_id: None,
2800        }
2801    }
2802}