Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8
9use fakecloud_aws::xml::xml_escape;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use crate::runtime::{RdsRuntime, RuntimeError};
13use crate::state::{
14    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
15    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsTag, SharedRdsState,
16};
17
18const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "AddTagsToResource",
21    "CreateDBInstance",
22    "CreateDBInstanceReadReplica",
23    "CreateDBParameterGroup",
24    "CreateDBSnapshot",
25    "CreateDBSubnetGroup",
26    "DeleteDBInstance",
27    "DeleteDBParameterGroup",
28    "DeleteDBSnapshot",
29    "DeleteDBSubnetGroup",
30    "DescribeDBEngineVersions",
31    "DescribeDBInstances",
32    "DescribeDBParameterGroups",
33    "DescribeDBSnapshots",
34    "DescribeDBSubnetGroups",
35    "DescribeOrderableDBInstanceOptions",
36    "ListTagsForResource",
37    "ModifyDBInstance",
38    "ModifyDBParameterGroup",
39    "ModifyDBSubnetGroup",
40    "RebootDBInstance",
41    "RemoveTagsFromResource",
42    "RestoreDBInstanceFromDBSnapshot",
43];
44
45pub struct RdsService {
46    state: SharedRdsState,
47    runtime: Option<Arc<RdsRuntime>>,
48}
49
50impl RdsService {
51    pub fn new(state: SharedRdsState) -> Self {
52        Self {
53            state,
54            runtime: None,
55        }
56    }
57
58    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
59        self.runtime = Some(runtime);
60        self
61    }
62
63    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
64    ///
65    /// RDS operations that start, stop, or reach into a database container fail
66    /// with a consistent wire error when the daemon (Docker/Podman) is missing
67    /// rather than each caller restating the message.
68    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
69        self.runtime.as_ref().ok_or_else(|| {
70            AwsServiceError::aws_error(
71                StatusCode::SERVICE_UNAVAILABLE,
72                "InvalidParameterValue",
73                "Docker/Podman is required for RDS DB instances but is not available",
74            )
75        })
76    }
77}
78
79#[async_trait]
80impl AwsService for RdsService {
81    fn service_name(&self) -> &str {
82        "rds"
83    }
84
85    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
86        match request.action.as_str() {
87            "AddTagsToResource" => self.add_tags_to_resource(&request),
88            "CreateDBInstance" => self.create_db_instance(&request).await,
89            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
90            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
91            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
92            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
93            "DeleteDBInstance" => self.delete_db_instance(&request).await,
94            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
95            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
96            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
97            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
98            "DescribeDBInstances" => self.describe_db_instances(&request),
99            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
100            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
101            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
102            "DescribeOrderableDBInstanceOptions" => {
103                self.describe_orderable_db_instance_options(&request)
104            }
105            "ListTagsForResource" => self.list_tags_for_resource(&request),
106            "ModifyDBInstance" => self.modify_db_instance(&request),
107            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
108            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
109            "RebootDBInstance" => self.reboot_db_instance(&request).await,
110            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
111            "RestoreDBInstanceFromDBSnapshot" => {
112                self.restore_db_instance_from_db_snapshot(&request).await
113            }
114            _ => Err(AwsServiceError::action_not_implemented(
115                self.service_name(),
116                &request.action,
117            )),
118        }
119    }
120
121    fn supported_actions(&self) -> &[&str] {
122        SUPPORTED_ACTIONS
123    }
124}
125
126impl RdsService {
127    async fn create_db_instance(
128        &self,
129        request: &AwsRequest,
130    ) -> Result<AwsResponse, AwsServiceError> {
131        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
132        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
133        let db_instance_class = required_param(request, "DBInstanceClass")?;
134        let engine = required_param(request, "Engine")?;
135        let master_username = required_param(request, "MasterUsername")?;
136        let master_user_password = required_param(request, "MasterUserPassword")?;
137        let db_name = optional_param(request, "DBName");
138        let engine_version =
139            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
140        let publicly_accessible =
141            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
142                .unwrap_or(true);
143        let deletion_protection =
144            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
145                .unwrap_or(false);
146        let port = optional_i32_param(request, "Port")?
147            .unwrap_or_else(|| default_port_for_engine(&engine));
148        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
149
150        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
151            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
152
153        let backup_retention_period =
154            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
155        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
156            .unwrap_or_else(|| "03:00-04:00".to_string());
157        let option_group_name = optional_param(request, "OptionGroupName");
158        let multi_az =
159            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
160
161        validate_create_request(
162            &db_instance_identifier,
163            allocated_storage,
164            &db_instance_class,
165            &engine,
166            &engine_version,
167            port,
168        )?;
169
170        {
171            let mut state = self.state.write();
172            if !state.begin_instance_creation(&db_instance_identifier) {
173                return Err(AwsServiceError::aws_error(
174                    StatusCode::BAD_REQUEST,
175                    "DBInstanceAlreadyExists",
176                    format!("DBInstance {} already exists.", db_instance_identifier),
177                ));
178            }
179            // Validate parameter group exists if specified by the caller
180            if let Some(ref pg_name) = db_parameter_group_name {
181                if !state.parameter_groups.contains_key(pg_name) {
182                    state.cancel_instance_creation(&db_instance_identifier);
183                    return Err(AwsServiceError::aws_error(
184                        StatusCode::NOT_FOUND,
185                        "DBParameterGroupNotFound",
186                        format!("DBParameterGroup {} not found.", pg_name),
187                    ));
188                }
189            }
190        }
191
192        let runtime = self.require_runtime()?;
193
194        let logical_db_name = db_name
195            .clone()
196            .unwrap_or_else(|| default_db_name(&engine).to_string());
197        let running = runtime
198            .ensure_postgres(
199                &db_instance_identifier,
200                &engine,
201                &engine_version,
202                &master_username,
203                &master_user_password,
204                &logical_db_name,
205            )
206            .await
207            .map_err(|error| {
208                self.state
209                    .write()
210                    .cancel_instance_creation(&db_instance_identifier);
211                runtime_error_to_service_error(error)
212            })?;
213
214        let mut state = self.state.write();
215        let created_at = Utc::now();
216        let instance = DbInstance {
217            db_instance_identifier: db_instance_identifier.clone(),
218            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
219            db_instance_class: db_instance_class.clone(),
220            engine: engine.clone(),
221            engine_version: engine_version.clone(),
222            db_instance_status: "available".to_string(),
223            master_username: master_username.clone(),
224            db_name: db_name.clone(),
225            endpoint_address: "127.0.0.1".to_string(),
226            port: i32::from(running.host_port),
227            allocated_storage,
228            publicly_accessible,
229            deletion_protection,
230            created_at,
231            dbi_resource_id: state.next_dbi_resource_id(),
232            master_user_password,
233            container_id: running.container_id,
234            host_port: running.host_port,
235            tags: Vec::new(),
236            read_replica_source_db_instance_identifier: None,
237            read_replica_db_instance_identifiers: Vec::new(),
238            vpc_security_group_ids,
239            db_parameter_group_name,
240            backup_retention_period,
241            preferred_backup_window,
242            latest_restorable_time: if backup_retention_period > 0 {
243                Some(created_at)
244            } else {
245                None
246            },
247            option_group_name,
248            multi_az,
249            pending_modified_values: None,
250        };
251        state.finish_instance_creation(instance.clone());
252
253        Ok(AwsResponse::xml(
254            StatusCode::OK,
255            xml_wrap(
256                "CreateDBInstance",
257                &format!(
258                    "<DBInstance>{}</DBInstance>",
259                    db_instance_xml(&instance, Some("creating"))
260                ),
261                &request.request_id,
262            ),
263        ))
264    }
265
266    async fn delete_db_instance(
267        &self,
268        request: &AwsRequest,
269    ) -> Result<AwsResponse, AwsServiceError> {
270        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
271        let skip_final_snapshot =
272            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
273                .unwrap_or(false);
274        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
275
276        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
277            return Err(AwsServiceError::aws_error(
278                StatusCode::BAD_REQUEST,
279                "InvalidParameterCombination",
280                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
281            ));
282        }
283        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
284            return Err(AwsServiceError::aws_error(
285                StatusCode::BAD_REQUEST,
286                "InvalidParameterCombination",
287                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
288            ));
289        }
290
291        // Check deletion protection BEFORE creating snapshot or making any changes
292        {
293            let state = self.state.read();
294            if let Some(instance) = state.instances.get(&db_instance_identifier) {
295                if instance.deletion_protection {
296                    return Err(AwsServiceError::aws_error(
297                        StatusCode::BAD_REQUEST,
298                        "InvalidDBInstanceState",
299                        format!(
300                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
301                            db_instance_identifier
302                        ),
303                    ));
304                }
305            } else {
306                return Err(db_instance_not_found(&db_instance_identifier));
307            }
308        }
309
310        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
311            self.create_final_db_snapshot(&db_instance_identifier, snapshot_id)
312                .await?;
313        }
314
315        let instance = {
316            let mut state = self.state.write();
317            let instance = state
318                .instances
319                .remove(&db_instance_identifier)
320                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
321
322            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
323                if let Some(source) = state.instances.get_mut(source_id) {
324                    source
325                        .read_replica_db_instance_identifiers
326                        .retain(|id| id != &db_instance_identifier);
327                }
328            }
329
330            for replica_id in &instance.read_replica_db_instance_identifiers {
331                if let Some(replica) = state.instances.get_mut(replica_id) {
332                    replica.read_replica_source_db_instance_identifier = None;
333                }
334            }
335
336            instance
337        };
338
339        if let Some(runtime) = &self.runtime {
340            runtime.stop_container(&db_instance_identifier).await;
341        }
342
343        Ok(AwsResponse::xml(
344            StatusCode::OK,
345            xml_wrap(
346                "DeleteDBInstance",
347                &format!(
348                    "<DBInstance>{}</DBInstance>",
349                    db_instance_xml(&instance, Some("deleting"))
350                ),
351                &request.request_id,
352            ),
353        ))
354    }
355
356    /// Take a final snapshot of an instance that is about to be deleted,
357    /// persisting the dumped database into `state.snapshots`. The DLQ-style
358    /// conflict check runs twice — once under the read lock before paying
359    /// for the dump, once under the write lock before committing — to keep
360    /// concurrent deletes from colliding.
361    async fn create_final_db_snapshot(
362        &self,
363        db_instance_identifier: &str,
364        snapshot_id: &str,
365    ) -> Result<(), AwsServiceError> {
366        let runtime = self.runtime.as_ref().ok_or_else(|| {
367            AwsServiceError::aws_error(
368                StatusCode::SERVICE_UNAVAILABLE,
369                "InvalidParameterValue",
370                "Docker/Podman is required for RDS snapshots but is not available",
371            )
372        })?;
373
374        let (instance_for_snapshot, db_name) = {
375            let state = self.state.read();
376
377            if state.snapshots.contains_key(snapshot_id) {
378                return Err(AwsServiceError::aws_error(
379                    StatusCode::CONFLICT,
380                    "DBSnapshotAlreadyExists",
381                    format!("DBSnapshot {snapshot_id} already exists."),
382                ));
383            }
384
385            let instance = state
386                .instances
387                .get(db_instance_identifier)
388                .cloned()
389                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
390
391            let default_db = default_db_name(&instance.engine);
392            let db_name = instance
393                .db_name
394                .as_deref()
395                .unwrap_or(default_db)
396                .to_string();
397
398            (instance, db_name)
399        };
400
401        let dump_data = runtime
402            .dump_database(
403                db_instance_identifier,
404                &instance_for_snapshot.engine,
405                &instance_for_snapshot.master_username,
406                &instance_for_snapshot.master_user_password,
407                &db_name,
408            )
409            .await
410            .map_err(runtime_error_to_service_error)?;
411
412        let mut state = self.state.write();
413
414        if state.snapshots.contains_key(snapshot_id) {
415            return Err(AwsServiceError::aws_error(
416                StatusCode::CONFLICT,
417                "DBSnapshotAlreadyExists",
418                format!("DBSnapshot {snapshot_id} already exists."),
419            ));
420        }
421
422        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
423
424        let snapshot = DbSnapshot {
425            db_snapshot_identifier: snapshot_id.to_string(),
426            db_snapshot_arn: snapshot_arn,
427            db_instance_identifier: db_instance_identifier.to_string(),
428            snapshot_create_time: Utc::now(),
429            engine: instance_for_snapshot.engine.clone(),
430            engine_version: instance_for_snapshot.engine_version.clone(),
431            allocated_storage: instance_for_snapshot.allocated_storage,
432            status: "available".to_string(),
433            port: instance_for_snapshot.port,
434            master_username: instance_for_snapshot.master_username.clone(),
435            db_name: instance_for_snapshot.db_name.clone(),
436            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
437            snapshot_type: "manual".to_string(),
438            master_user_password: instance_for_snapshot.master_user_password.clone(),
439            tags: Vec::new(),
440            dump_data,
441        };
442
443        state.snapshots.insert(snapshot_id.to_string(), snapshot);
444        Ok(())
445    }
446
447    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
449        let db_instance_class = optional_param(request, "DBInstanceClass");
450        let deletion_protection =
451            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
452        let apply_immediately =
453            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
454
455        // Parse VPC security group IDs - only if at least one is provided
456        let vpc_security_group_ids = {
457            let mut ids = Vec::new();
458            for index in 1.. {
459                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
460                match optional_param(request, &sg_id_name) {
461                    Some(sg_id) => ids.push(sg_id),
462                    None => break,
463                }
464            }
465            if ids.is_empty() {
466                None
467            } else {
468                Some(ids)
469            }
470        };
471
472        if db_instance_class.is_none()
473            && deletion_protection.is_none()
474            && vpc_security_group_ids.is_none()
475        {
476            return Err(AwsServiceError::aws_error(
477                StatusCode::BAD_REQUEST,
478                "InvalidParameterCombination",
479                "At least one supported mutable field must be provided.",
480            ));
481        }
482        if let Some(ref class) = db_instance_class {
483            validate_db_instance_class(class)?;
484        }
485
486        let mut state = self.state.write();
487        let instance = state
488            .instances
489            .get_mut(&db_instance_identifier)
490            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
491
492        // If ApplyImmediately is false, stage changes as pending
493        if apply_immediately == Some(false) {
494            let pending = instance
495                .pending_modified_values
496                .get_or_insert(Default::default());
497            if let Some(class) = db_instance_class {
498                pending.db_instance_class = Some(class);
499            }
500            // Note: deletion_protection and vpc_security_group_ids are applied immediately
501            // regardless of ApplyImmediately flag (per AWS behavior)
502            if let Some(deletion_protection) = deletion_protection {
503                instance.deletion_protection = deletion_protection;
504            }
505            if let Some(security_group_ids) = vpc_security_group_ids {
506                instance.vpc_security_group_ids = security_group_ids;
507            }
508        } else {
509            // Apply immediately (default behavior)
510            if let Some(class) = db_instance_class {
511                instance.db_instance_class = class;
512            }
513            if let Some(deletion_protection) = deletion_protection {
514                instance.deletion_protection = deletion_protection;
515            }
516            if let Some(security_group_ids) = vpc_security_group_ids {
517                instance.vpc_security_group_ids = security_group_ids;
518            }
519        }
520
521        Ok(AwsResponse::xml(
522            StatusCode::OK,
523            xml_wrap(
524                "ModifyDBInstance",
525                &format!(
526                    "<DBInstance>{}</DBInstance>",
527                    db_instance_xml(instance, Some("modifying"))
528                ),
529                &request.request_id,
530            ),
531        ))
532    }
533
534    async fn reboot_db_instance(
535        &self,
536        request: &AwsRequest,
537    ) -> Result<AwsResponse, AwsServiceError> {
538        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
539        let force_failover =
540            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
541        if force_failover == Some(true) {
542            return Err(AwsServiceError::aws_error(
543                StatusCode::BAD_REQUEST,
544                "InvalidParameterCombination",
545                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
546            ));
547        }
548
549        let instance = {
550            let state = self.state.read();
551            state
552                .instances
553                .get(&db_instance_identifier)
554                .cloned()
555                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
556        };
557
558        let runtime = self.require_runtime()?;
559
560        let running = runtime
561            .restart_container(
562                &db_instance_identifier,
563                &instance.engine,
564                &instance.master_username,
565                &instance.master_user_password,
566                instance
567                    .db_name
568                    .as_deref()
569                    .unwrap_or(default_db_name(&instance.engine)),
570            )
571            .await
572            .map_err(runtime_error_to_service_error)?;
573
574        let instance = {
575            let mut state = self.state.write();
576            let instance = state
577                .instances
578                .get_mut(&db_instance_identifier)
579                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
580            instance.host_port = running.host_port;
581            instance.port = i32::from(running.host_port);
582
583            // Apply any pending modifications
584            if let Some(pending) = instance.pending_modified_values.take() {
585                if let Some(class) = pending.db_instance_class {
586                    instance.db_instance_class = class;
587                }
588                if let Some(allocated_storage) = pending.allocated_storage {
589                    instance.allocated_storage = allocated_storage;
590                }
591                if let Some(backup_retention_period) = pending.backup_retention_period {
592                    instance.backup_retention_period = backup_retention_period;
593                }
594                if let Some(multi_az) = pending.multi_az {
595                    instance.multi_az = multi_az;
596                }
597                if let Some(engine_version) = pending.engine_version {
598                    instance.engine_version = engine_version;
599                }
600                if let Some(master_user_password) = pending.master_user_password {
601                    instance.master_user_password = master_user_password;
602                }
603            }
604
605            instance.clone()
606        };
607
608        Ok(AwsResponse::xml(
609            StatusCode::OK,
610            xml_wrap(
611                "RebootDBInstance",
612                &format!(
613                    "<DBInstance>{}</DBInstance>",
614                    db_instance_xml(&instance, Some("rebooting"))
615                ),
616                &request.request_id,
617            ),
618        ))
619    }
620
621    fn describe_db_engine_versions(
622        &self,
623        request: &AwsRequest,
624    ) -> Result<AwsResponse, AwsServiceError> {
625        let engine = optional_param(request, "Engine");
626        let engine_version = optional_param(request, "EngineVersion");
627        let family = optional_param(request, "DBParameterGroupFamily");
628        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
629
630        let mut versions = filter_engine_versions(
631            &default_engine_versions(),
632            &engine,
633            &engine_version,
634            &family,
635        );
636
637        if default_only.unwrap_or(false) {
638            versions.truncate(1);
639        }
640
641        Ok(AwsResponse::xml(
642            StatusCode::OK,
643            xml_wrap(
644                "DescribeDBEngineVersions",
645                &format!(
646                    "<DBEngineVersions>{}</DBEngineVersions>",
647                    versions.iter().map(engine_version_xml).collect::<String>()
648                ),
649                &request.request_id,
650            ),
651        ))
652    }
653
654    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
655        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
656        let marker = optional_param(request, "Marker");
657        let max_records = optional_param(request, "MaxRecords");
658
659        let state = self.state.read();
660
661        // If specific identifier requested, return just that one (no pagination)
662        if let Some(identifier) = db_instance_identifier {
663            let instance = state
664                .instances
665                .get(&identifier)
666                .cloned()
667                .ok_or_else(|| db_instance_not_found(&identifier))?;
668
669            return Ok(AwsResponse::xml(
670                StatusCode::OK,
671                xml_wrap(
672                    "DescribeDBInstances",
673                    &format!(
674                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
675                        db_instance_xml(&instance, None)
676                    ),
677                    &request.request_id,
678                ),
679            ));
680        }
681
682        // Get all instances sorted by created_at, then identifier
683        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
684        instances.sort_by(|a, b| {
685            a.created_at
686                .cmp(&b.created_at)
687                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
688        });
689
690        // Apply pagination
691        let paginated = paginate(instances, marker, max_records, |inst| {
692            &inst.db_instance_identifier
693        })?;
694
695        let marker_xml = paginated
696            .next_marker
697            .as_ref()
698            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
699            .unwrap_or_default();
700
701        Ok(AwsResponse::xml(
702            StatusCode::OK,
703            xml_wrap(
704                "DescribeDBInstances",
705                &format!(
706                    "<DBInstances>{}</DBInstances>{}",
707                    paginated
708                        .items
709                        .iter()
710                        .map(|instance| {
711                            format!(
712                                "<DBInstance>{}</DBInstance>",
713                                db_instance_xml(instance, None)
714                            )
715                        })
716                        .collect::<String>(),
717                    marker_xml
718                ),
719                &request.request_id,
720            ),
721        ))
722    }
723
724    fn describe_orderable_db_instance_options(
725        &self,
726        request: &AwsRequest,
727    ) -> Result<AwsResponse, AwsServiceError> {
728        let engine = optional_param(request, "Engine");
729        let engine_version = optional_param(request, "EngineVersion");
730        let db_instance_class = optional_param(request, "DBInstanceClass");
731        let license_model = optional_param(request, "LicenseModel");
732        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
733
734        let options = filter_orderable_options(
735            &default_orderable_options(),
736            &engine,
737            &engine_version,
738            &db_instance_class,
739            &license_model,
740            vpc,
741        );
742
743        Ok(AwsResponse::xml(
744            StatusCode::OK,
745            xml_wrap(
746                "DescribeOrderableDBInstanceOptions",
747                &format!(
748                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
749                    options.iter().map(orderable_option_xml).collect::<String>()
750                ),
751                &request.request_id,
752            ),
753        ))
754    }
755
756    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757        let resource_name = required_param(request, "ResourceName")?;
758        let tags = parse_tags(request)?;
759
760        if tags.is_empty() {
761            return Err(AwsServiceError::aws_error(
762                StatusCode::BAD_REQUEST,
763                "MissingParameter",
764                "The request must contain the parameter Tags.",
765            ));
766        }
767
768        let mut state = self.state.write();
769        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
770        merge_tags(&mut instance.tags, &tags);
771
772        Ok(AwsResponse::xml(
773            StatusCode::OK,
774            xml_wrap("AddTagsToResource", "", &request.request_id),
775        ))
776    }
777
778    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779        let resource_name = required_param(request, "ResourceName")?;
780        if query_param_prefix_exists(request, "Filters.") {
781            return Err(AwsServiceError::aws_error(
782                StatusCode::BAD_REQUEST,
783                "InvalidParameterValue",
784                "Filters are not yet supported for ListTagsForResource.",
785            ));
786        }
787
788        let state = self.state.read();
789        let instance = find_instance_by_arn(&state, &resource_name)?;
790        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
791
792        Ok(AwsResponse::xml(
793            StatusCode::OK,
794            xml_wrap(
795                "ListTagsForResource",
796                &format!("<TagList>{tag_xml}</TagList>"),
797                &request.request_id,
798            ),
799        ))
800    }
801
802    fn remove_tags_from_resource(
803        &self,
804        request: &AwsRequest,
805    ) -> Result<AwsResponse, AwsServiceError> {
806        let resource_name = required_param(request, "ResourceName")?;
807        let tag_keys = parse_tag_keys(request)?;
808
809        if tag_keys.is_empty() {
810            return Err(AwsServiceError::aws_error(
811                StatusCode::BAD_REQUEST,
812                "MissingParameter",
813                "The request must contain the parameter TagKeys.",
814            ));
815        }
816
817        let mut state = self.state.write();
818        let instance = find_instance_by_arn_mut(&mut state, &resource_name)?;
819        instance
820            .tags
821            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
822
823        Ok(AwsResponse::xml(
824            StatusCode::OK,
825            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
826        ))
827    }
828
829    async fn create_db_snapshot(
830        &self,
831        request: &AwsRequest,
832    ) -> Result<AwsResponse, AwsServiceError> {
833        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
834        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
835
836        let runtime = self.runtime.as_ref().ok_or_else(|| {
837            AwsServiceError::aws_error(
838                StatusCode::SERVICE_UNAVAILABLE,
839                "InvalidParameterValue",
840                "Docker/Podman is required for RDS snapshots but is not available",
841            )
842        })?;
843
844        let (instance, db_name) = {
845            let state = self.state.write();
846
847            if state.snapshots.contains_key(&db_snapshot_identifier) {
848                return Err(AwsServiceError::aws_error(
849                    StatusCode::CONFLICT,
850                    "DBSnapshotAlreadyExists",
851                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
852                ));
853            }
854
855            let instance = state
856                .instances
857                .get(&db_instance_identifier)
858                .cloned()
859                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
860
861            let default_db = default_db_name(&instance.engine);
862            let db_name = instance
863                .db_name
864                .as_deref()
865                .unwrap_or(default_db)
866                .to_string();
867
868            (instance, db_name)
869        };
870
871        let dump_data = runtime
872            .dump_database(
873                &db_instance_identifier,
874                &instance.engine,
875                &instance.master_username,
876                &instance.master_user_password,
877                &db_name,
878            )
879            .await
880            .map_err(runtime_error_to_service_error)?;
881
882        let mut state = self.state.write();
883
884        if state.snapshots.contains_key(&db_snapshot_identifier) {
885            return Err(AwsServiceError::aws_error(
886                StatusCode::CONFLICT,
887                "DBSnapshotAlreadyExists",
888                format!("DBSnapshot {db_snapshot_identifier} already exists."),
889            ));
890        }
891
892        let snapshot = DbSnapshot {
893            db_snapshot_identifier: db_snapshot_identifier.clone(),
894            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
895            db_instance_identifier: instance.db_instance_identifier.clone(),
896            snapshot_create_time: Utc::now(),
897            engine: instance.engine.clone(),
898            engine_version: instance.engine_version.clone(),
899            allocated_storage: instance.allocated_storage,
900            status: "available".to_string(),
901            port: instance.port,
902            master_username: instance.master_username.clone(),
903            db_name: instance.db_name.clone(),
904            dbi_resource_id: instance.dbi_resource_id.clone(),
905            snapshot_type: "manual".to_string(),
906            master_user_password: instance.master_user_password.clone(),
907            tags: Vec::new(),
908            dump_data,
909        };
910
911        state
912            .snapshots
913            .insert(db_snapshot_identifier, snapshot.clone());
914
915        Ok(AwsResponse::xml(
916            StatusCode::OK,
917            xml_wrap(
918                "CreateDBSnapshot",
919                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
920                &request.request_id,
921            ),
922        ))
923    }
924
925    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
926        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
927        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
928        let marker = optional_param(request, "Marker");
929        let max_records = optional_param(request, "MaxRecords");
930
931        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
932            return Err(AwsServiceError::aws_error(
933                StatusCode::BAD_REQUEST,
934                "InvalidParameterCombination",
935                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
936            ));
937        }
938
939        let state = self.state.read();
940
941        // If specific snapshot requested, return just that one (no pagination)
942        if let Some(snapshot_id) = db_snapshot_identifier {
943            let snapshot = state
944                .snapshots
945                .get(&snapshot_id)
946                .cloned()
947                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
948
949            return Ok(AwsResponse::xml(
950                StatusCode::OK,
951                xml_wrap(
952                    "DescribeDBSnapshots",
953                    &format!(
954                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
955                        db_snapshot_xml(&snapshot)
956                    ),
957                    &request.request_id,
958                ),
959            ));
960        }
961
962        // Get snapshots, filtered by instance identifier if provided
963        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
964            state
965                .snapshots
966                .values()
967                .filter(|s| s.db_instance_identifier == instance_id)
968                .cloned()
969                .collect()
970        } else {
971            state.snapshots.values().cloned().collect()
972        };
973
974        // Sort by creation time, then identifier
975        snapshots.sort_by(|a, b| {
976            a.snapshot_create_time
977                .cmp(&b.snapshot_create_time)
978                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
979        });
980
981        // Apply pagination
982        let paginated = paginate(snapshots, marker, max_records, |snap| {
983            &snap.db_snapshot_identifier
984        })?;
985
986        let marker_xml = paginated
987            .next_marker
988            .as_ref()
989            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
990            .unwrap_or_default();
991
992        Ok(AwsResponse::xml(
993            StatusCode::OK,
994            xml_wrap(
995                "DescribeDBSnapshots",
996                &format!(
997                    "<DBSnapshots>{}</DBSnapshots>{}",
998                    paginated
999                        .items
1000                        .iter()
1001                        .map(|snapshot| format!(
1002                            "<DBSnapshot>{}</DBSnapshot>",
1003                            db_snapshot_xml(snapshot)
1004                        ))
1005                        .collect::<String>(),
1006                    marker_xml
1007                ),
1008                &request.request_id,
1009            ),
1010        ))
1011    }
1012
1013    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1014        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1015
1016        let mut state = self.state.write();
1017
1018        let snapshot = state
1019            .snapshots
1020            .remove(&db_snapshot_identifier)
1021            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1022
1023        Ok(AwsResponse::xml(
1024            StatusCode::OK,
1025            xml_wrap(
1026                "DeleteDBSnapshot",
1027                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1028                &request.request_id,
1029            ),
1030        ))
1031    }
1032
1033    async fn restore_db_instance_from_db_snapshot(
1034        &self,
1035        request: &AwsRequest,
1036    ) -> Result<AwsResponse, AwsServiceError> {
1037        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1038        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1039        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1040
1041        let runtime = self.require_runtime()?;
1042
1043        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1044            let mut state = self.state.write();
1045
1046            if !state.begin_instance_creation(&db_instance_identifier) {
1047                return Err(AwsServiceError::aws_error(
1048                    StatusCode::CONFLICT,
1049                    "DBInstanceAlreadyExists",
1050                    format!("DBInstance {db_instance_identifier} already exists."),
1051                ));
1052            }
1053
1054            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1055                Some(s) => s,
1056                None => {
1057                    state.cancel_instance_creation(&db_instance_identifier);
1058                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1059                }
1060            };
1061
1062            let dbi_resource_id = state.next_dbi_resource_id();
1063            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1064            let created_at = Utc::now();
1065
1066            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1067        };
1068
1069        let db_name = snapshot
1070            .db_name
1071            .as_deref()
1072            .unwrap_or(default_db_name(&snapshot.engine));
1073        let running = match runtime
1074            .ensure_postgres(
1075                &db_instance_identifier,
1076                &snapshot.engine,
1077                &snapshot.engine_version,
1078                &snapshot.master_username,
1079                &snapshot.master_user_password,
1080                db_name,
1081            )
1082            .await
1083        {
1084            Ok(running) => running,
1085            Err(e) => {
1086                self.state
1087                    .write()
1088                    .cancel_instance_creation(&db_instance_identifier);
1089                return Err(runtime_error_to_service_error(e));
1090            }
1091        };
1092
1093        if let Err(e) = runtime
1094            .restore_database(
1095                &db_instance_identifier,
1096                &snapshot.engine,
1097                &snapshot.master_username,
1098                &snapshot.master_user_password,
1099                db_name,
1100                &snapshot.dump_data,
1101            )
1102            .await
1103        {
1104            self.state
1105                .write()
1106                .cancel_instance_creation(&db_instance_identifier);
1107            runtime.stop_container(&db_instance_identifier).await;
1108            return Err(runtime_error_to_service_error(e));
1109        }
1110
1111        let instance = build_restored_instance(
1112            &db_instance_identifier,
1113            db_instance_arn,
1114            dbi_resource_id,
1115            created_at,
1116            vpc_security_group_ids,
1117            &snapshot,
1118            &running,
1119        );
1120
1121        self.state
1122            .write()
1123            .finish_instance_creation(instance.clone());
1124
1125        Ok(AwsResponse::xml(
1126            StatusCode::OK,
1127            xml_wrap(
1128                "RestoreDBInstanceFromDBSnapshot",
1129                &format!(
1130                    "<DBInstance>{}</DBInstance>",
1131                    db_instance_xml(&instance, None)
1132                ),
1133                &request.request_id,
1134            ),
1135        ))
1136    }
1137
1138    async fn create_db_instance_read_replica(
1139        &self,
1140        request: &AwsRequest,
1141    ) -> Result<AwsResponse, AwsServiceError> {
1142        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1143        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1144
1145        let runtime = self.runtime.as_ref().ok_or_else(|| {
1146            AwsServiceError::aws_error(
1147                StatusCode::SERVICE_UNAVAILABLE,
1148                "InvalidParameterValue",
1149                "Docker/Podman is required for RDS read replicas but is not available",
1150            )
1151        })?;
1152
1153        let (source_instance, db_name) = {
1154            let mut state = self.state.write();
1155
1156            if !state.begin_instance_creation(&db_instance_identifier) {
1157                return Err(AwsServiceError::aws_error(
1158                    StatusCode::CONFLICT,
1159                    "DBInstanceAlreadyExists",
1160                    format!("DBInstance {db_instance_identifier} already exists."),
1161                ));
1162            }
1163
1164            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1165            {
1166                Some(inst) => inst,
1167                None => {
1168                    state.cancel_instance_creation(&db_instance_identifier);
1169                    return Err(db_instance_not_found(&source_db_instance_identifier));
1170                }
1171            };
1172
1173            let default_db = default_db_name(&source_instance.engine);
1174            let db_name = source_instance
1175                .db_name
1176                .as_deref()
1177                .unwrap_or(default_db)
1178                .to_string();
1179
1180            (source_instance, db_name)
1181        };
1182
1183        let dump_data = match runtime
1184            .dump_database(
1185                &source_db_instance_identifier,
1186                &source_instance.engine,
1187                &source_instance.master_username,
1188                &source_instance.master_user_password,
1189                &db_name,
1190            )
1191            .await
1192        {
1193            Ok(data) => data,
1194            Err(e) => {
1195                self.state
1196                    .write()
1197                    .cancel_instance_creation(&db_instance_identifier);
1198                return Err(runtime_error_to_service_error(e));
1199            }
1200        };
1201
1202        let dbi_resource_id = self.state.read().next_dbi_resource_id();
1203        let db_instance_arn = self.state.read().db_instance_arn(&db_instance_identifier);
1204        let created_at = Utc::now();
1205
1206        let running = match runtime
1207            .ensure_postgres(
1208                &db_instance_identifier,
1209                &source_instance.engine,
1210                &source_instance.engine_version,
1211                &source_instance.master_username,
1212                &source_instance.master_user_password,
1213                &db_name,
1214            )
1215            .await
1216        {
1217            Ok(running) => running,
1218            Err(e) => {
1219                self.state
1220                    .write()
1221                    .cancel_instance_creation(&db_instance_identifier);
1222                return Err(runtime_error_to_service_error(e));
1223            }
1224        };
1225
1226        if let Err(e) = runtime
1227            .restore_database(
1228                &db_instance_identifier,
1229                &source_instance.engine,
1230                &source_instance.master_username,
1231                &source_instance.master_user_password,
1232                &db_name,
1233                &dump_data,
1234            )
1235            .await
1236        {
1237            self.state
1238                .write()
1239                .cancel_instance_creation(&db_instance_identifier);
1240            runtime.stop_container(&db_instance_identifier).await;
1241            return Err(runtime_error_to_service_error(e));
1242        }
1243
1244        let replica = build_read_replica_instance(
1245            &db_instance_identifier,
1246            db_instance_arn,
1247            dbi_resource_id,
1248            created_at,
1249            &source_db_instance_identifier,
1250            &source_instance,
1251            &running,
1252        );
1253
1254        let source_missing = {
1255            let mut state = self.state.write();
1256            match state.instances.get_mut(&source_db_instance_identifier) {
1257                Some(source) => {
1258                    source
1259                        .read_replica_db_instance_identifiers
1260                        .push(db_instance_identifier.clone());
1261                    state.finish_instance_creation(replica.clone());
1262                    false
1263                }
1264                None => {
1265                    state.cancel_instance_creation(&db_instance_identifier);
1266                    true
1267                }
1268            }
1269        };
1270
1271        if source_missing {
1272            runtime.stop_container(&db_instance_identifier).await;
1273            return Err(db_instance_not_found(&source_db_instance_identifier));
1274        }
1275
1276        Ok(AwsResponse::xml(
1277            StatusCode::OK,
1278            xml_wrap(
1279                "CreateDBInstanceReadReplica",
1280                &format!(
1281                    "<DBInstance>{}</DBInstance>",
1282                    db_instance_xml(&replica, None)
1283                ),
1284                &request.request_id,
1285            ),
1286        ))
1287    }
1288
1289    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1290        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1291        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1292        let subnet_ids = parse_subnet_ids(request)?;
1293
1294        if subnet_ids.is_empty() {
1295            return Err(AwsServiceError::aws_error(
1296                StatusCode::BAD_REQUEST,
1297                "InvalidParameterValue",
1298                "At least one subnet must be specified.",
1299            ));
1300        }
1301
1302        if subnet_ids.len() < 2 {
1303            return Err(AwsServiceError::aws_error(
1304                StatusCode::BAD_REQUEST,
1305                "DBSubnetGroupDoesNotCoverEnoughAZs",
1306                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1307            ));
1308        }
1309
1310        let mut state = self.state.write();
1311
1312        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1313            return Err(AwsServiceError::aws_error(
1314                StatusCode::CONFLICT,
1315                "DBSubnetGroupAlreadyExists",
1316                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1317            ));
1318        }
1319
1320        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1321        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1322            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1323            .collect();
1324
1325        // Validate that subnets span at least 2 unique Availability Zones
1326        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1327        if unique_azs.len() < 2 {
1328            return Err(AwsServiceError::aws_error(
1329                StatusCode::BAD_REQUEST,
1330                "DBSubnetGroupDoesNotCoverEnoughAZs",
1331                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1332            ));
1333        }
1334
1335        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1336        let tags = parse_tags(request)?;
1337
1338        let subnet_group = DbSubnetGroup {
1339            db_subnet_group_name: db_subnet_group_name.clone(),
1340            db_subnet_group_arn,
1341            db_subnet_group_description,
1342            vpc_id,
1343            subnet_ids,
1344            subnet_availability_zones,
1345            tags,
1346        };
1347
1348        state
1349            .subnet_groups
1350            .insert(db_subnet_group_name, subnet_group.clone());
1351
1352        Ok(AwsResponse::xml(
1353            StatusCode::OK,
1354            xml_wrap(
1355                "CreateDBSubnetGroup",
1356                &format!(
1357                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1358                    db_subnet_group_xml(&subnet_group)
1359                ),
1360                &request.request_id,
1361            ),
1362        ))
1363    }
1364
1365    fn describe_db_subnet_groups(
1366        &self,
1367        request: &AwsRequest,
1368    ) -> Result<AwsResponse, AwsServiceError> {
1369        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1370        let marker = optional_param(request, "Marker");
1371        let max_records = optional_param(request, "MaxRecords");
1372
1373        let state = self.state.read();
1374
1375        // If specific subnet group requested, return just that one (no pagination)
1376        if let Some(name) = db_subnet_group_name {
1377            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1378                AwsServiceError::aws_error(
1379                    StatusCode::NOT_FOUND,
1380                    "DBSubnetGroupNotFoundFault",
1381                    format!("DBSubnetGroup {} not found.", name),
1382                )
1383            })?;
1384
1385            return Ok(AwsResponse::xml(
1386                StatusCode::OK,
1387                xml_wrap(
1388                    "DescribeDBSubnetGroups",
1389                    &format!(
1390                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1391                        db_subnet_group_xml(sg)
1392                    ),
1393                    &request.request_id,
1394                ),
1395            ));
1396        }
1397
1398        // Get all subnet groups sorted by name
1399        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1400        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1401
1402        // Apply pagination
1403        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1404            &sg.db_subnet_group_name
1405        })?;
1406
1407        let marker_xml = paginated
1408            .next_marker
1409            .as_ref()
1410            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1411            .unwrap_or_default();
1412
1413        let body = paginated
1414            .items
1415            .iter()
1416            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1417            .collect::<Vec<_>>()
1418            .join("");
1419
1420        Ok(AwsResponse::xml(
1421            StatusCode::OK,
1422            xml_wrap(
1423                "DescribeDBSubnetGroups",
1424                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1425                &request.request_id,
1426            ),
1427        ))
1428    }
1429
1430    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1431        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1432
1433        let mut state = self.state.write();
1434
1435        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1436            return Err(AwsServiceError::aws_error(
1437                StatusCode::NOT_FOUND,
1438                "DBSubnetGroupNotFoundFault",
1439                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1440            ));
1441        }
1442
1443        Ok(AwsResponse::xml(
1444            StatusCode::OK,
1445            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1446        ))
1447    }
1448
1449    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1450        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1451        let subnet_ids = parse_subnet_ids(request)?;
1452
1453        if subnet_ids.is_empty() {
1454            return Err(AwsServiceError::aws_error(
1455                StatusCode::BAD_REQUEST,
1456                "InvalidParameterValue",
1457                "At least one subnet must be specified.",
1458            ));
1459        }
1460
1461        if subnet_ids.len() < 2 {
1462            return Err(AwsServiceError::aws_error(
1463                StatusCode::BAD_REQUEST,
1464                "DBSubnetGroupDoesNotCoverEnoughAZs",
1465                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1466            ));
1467        }
1468
1469        let mut state = self.state.write();
1470
1471        let region = state.region.clone();
1472
1473        let subnet_group = state
1474            .subnet_groups
1475            .get_mut(&db_subnet_group_name)
1476            .ok_or_else(|| {
1477                AwsServiceError::aws_error(
1478                    StatusCode::NOT_FOUND,
1479                    "DBSubnetGroupNotFoundFault",
1480                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1481                )
1482            })?;
1483
1484        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1485            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1486            .collect();
1487
1488        // Validate that subnets span at least 2 unique Availability Zones
1489        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1490        if unique_azs.len() < 2 {
1491            return Err(AwsServiceError::aws_error(
1492                StatusCode::BAD_REQUEST,
1493                "DBSubnetGroupDoesNotCoverEnoughAZs",
1494                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1495            ));
1496        }
1497
1498        subnet_group.subnet_ids = subnet_ids;
1499        subnet_group.subnet_availability_zones = subnet_availability_zones;
1500
1501        let subnet_group_clone = subnet_group.clone();
1502
1503        Ok(AwsResponse::xml(
1504            StatusCode::OK,
1505            xml_wrap(
1506                "ModifyDBSubnetGroup",
1507                &format!(
1508                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1509                    db_subnet_group_xml(&subnet_group_clone)
1510                ),
1511                &request.request_id,
1512            ),
1513        ))
1514    }
1515
1516    fn create_db_parameter_group(
1517        &self,
1518        request: &AwsRequest,
1519    ) -> Result<AwsResponse, AwsServiceError> {
1520        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1521        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1522        let description = required_param(request, "Description")?;
1523
1524        // Validate parameter group family against supported engines and versions
1525        let valid_families = [
1526            "postgres16",
1527            "postgres15",
1528            "postgres14",
1529            "postgres13",
1530            "mysql8.0",
1531            "mysql5.7",
1532            "mariadb10.11",
1533            "mariadb10.6",
1534        ];
1535
1536        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1537            return Err(AwsServiceError::aws_error(
1538                StatusCode::BAD_REQUEST,
1539                "InvalidParameterValue",
1540                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1541            ));
1542        }
1543
1544        let mut state = self.state.write();
1545
1546        if state
1547            .parameter_groups
1548            .contains_key(&db_parameter_group_name)
1549        {
1550            return Err(AwsServiceError::aws_error(
1551                StatusCode::CONFLICT,
1552                "DBParameterGroupAlreadyExists",
1553                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1554            ));
1555        }
1556
1557        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1558        let tags = parse_tags(request)?;
1559
1560        let parameter_group = DbParameterGroup {
1561            db_parameter_group_name: db_parameter_group_name.clone(),
1562            db_parameter_group_arn,
1563            db_parameter_group_family,
1564            description,
1565            parameters: std::collections::HashMap::new(),
1566            tags,
1567        };
1568
1569        state
1570            .parameter_groups
1571            .insert(db_parameter_group_name, parameter_group.clone());
1572
1573        Ok(AwsResponse::xml(
1574            StatusCode::OK,
1575            xml_wrap(
1576                "CreateDBParameterGroup",
1577                &format!(
1578                    "<DBParameterGroup>{}</DBParameterGroup>",
1579                    db_parameter_group_xml(&parameter_group)
1580                ),
1581                &request.request_id,
1582            ),
1583        ))
1584    }
1585
1586    fn describe_db_parameter_groups(
1587        &self,
1588        request: &AwsRequest,
1589    ) -> Result<AwsResponse, AwsServiceError> {
1590        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1591        let marker = optional_param(request, "Marker");
1592        let max_records = optional_param(request, "MaxRecords");
1593
1594        let state = self.state.read();
1595
1596        // If specific parameter group requested, return just that one (no pagination)
1597        if let Some(name) = db_parameter_group_name {
1598            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1599                AwsServiceError::aws_error(
1600                    StatusCode::NOT_FOUND,
1601                    "DBParameterGroupNotFound",
1602                    format!("DBParameterGroup {} not found.", name),
1603                )
1604            })?;
1605
1606            return Ok(AwsResponse::xml(
1607                StatusCode::OK,
1608                xml_wrap(
1609                    "DescribeDBParameterGroups",
1610                    &format!(
1611                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1612                        db_parameter_group_xml(pg)
1613                    ),
1614                    &request.request_id,
1615                ),
1616            ));
1617        }
1618
1619        // Get all parameter groups sorted by name
1620        let mut parameter_groups: Vec<DbParameterGroup> =
1621            state.parameter_groups.values().cloned().collect();
1622        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1623
1624        // Apply pagination
1625        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1626            &pg.db_parameter_group_name
1627        })?;
1628
1629        let marker_xml = paginated
1630            .next_marker
1631            .as_ref()
1632            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1633            .unwrap_or_default();
1634
1635        let body = paginated
1636            .items
1637            .iter()
1638            .map(|pg| {
1639                format!(
1640                    "<DBParameterGroup>{}</DBParameterGroup>",
1641                    db_parameter_group_xml(pg)
1642                )
1643            })
1644            .collect::<Vec<_>>()
1645            .join("");
1646
1647        Ok(AwsResponse::xml(
1648            StatusCode::OK,
1649            xml_wrap(
1650                "DescribeDBParameterGroups",
1651                &format!(
1652                    "<DBParameterGroups>{}</DBParameterGroups>{}",
1653                    body, marker_xml
1654                ),
1655                &request.request_id,
1656            ),
1657        ))
1658    }
1659
1660    fn delete_db_parameter_group(
1661        &self,
1662        request: &AwsRequest,
1663    ) -> Result<AwsResponse, AwsServiceError> {
1664        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1665
1666        let mut state = self.state.write();
1667
1668        if db_parameter_group_name.starts_with("default.") {
1669            return Err(AwsServiceError::aws_error(
1670                StatusCode::BAD_REQUEST,
1671                "InvalidParameterValue",
1672                "Cannot delete default parameter groups.",
1673            ));
1674        }
1675
1676        if state
1677            .parameter_groups
1678            .remove(&db_parameter_group_name)
1679            .is_none()
1680        {
1681            return Err(AwsServiceError::aws_error(
1682                StatusCode::NOT_FOUND,
1683                "DBParameterGroupNotFound",
1684                format!("DBParameterGroup {db_parameter_group_name} not found."),
1685            ));
1686        }
1687
1688        Ok(AwsResponse::xml(
1689            StatusCode::OK,
1690            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1691        ))
1692    }
1693
1694    fn modify_db_parameter_group(
1695        &self,
1696        request: &AwsRequest,
1697    ) -> Result<AwsResponse, AwsServiceError> {
1698        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1699
1700        let mut state = self.state.write();
1701
1702        let parameter_group = state
1703            .parameter_groups
1704            .get_mut(&db_parameter_group_name)
1705            .ok_or_else(|| {
1706                AwsServiceError::aws_error(
1707                    StatusCode::NOT_FOUND,
1708                    "DBParameterGroupNotFound",
1709                    format!("DBParameterGroup {db_parameter_group_name} not found."),
1710                )
1711            })?;
1712
1713        if let Some(new_description) = optional_param(request, "Description") {
1714            parameter_group.description = new_description;
1715        }
1716
1717        let parameter_group_clone = parameter_group.clone();
1718
1719        Ok(AwsResponse::xml(
1720            StatusCode::OK,
1721            xml_wrap(
1722                "ModifyDBParameterGroup",
1723                &format!(
1724                    "<DBParameterGroupName>{}</DBParameterGroupName>",
1725                    xml_escape(&parameter_group_clone.db_parameter_group_name)
1726                ),
1727                &request.request_id,
1728            ),
1729        ))
1730    }
1731}
1732
1733fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1734    fakecloud_core::query::optional_query_param(req, name)
1735}
1736
1737fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1738    fakecloud_core::query::required_query_param(req, name)
1739}
1740
1741fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1742    let value = required_param(req, name)?;
1743    value.parse::<i32>().map_err(|_| {
1744        AwsServiceError::aws_error(
1745            StatusCode::BAD_REQUEST,
1746            "InvalidParameterValue",
1747            format!("Parameter {name} must be a valid integer."),
1748        )
1749    })
1750}
1751
1752fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1753    optional_param(req, name)
1754        .map(|value| {
1755            value.parse::<i32>().map_err(|_| {
1756                AwsServiceError::aws_error(
1757                    StatusCode::BAD_REQUEST,
1758                    "InvalidParameterValue",
1759                    format!("Parameter {name} must be a valid integer."),
1760                )
1761            })
1762        })
1763        .transpose()
1764}
1765
1766fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1767    let mut tags = Vec::new();
1768    for index in 1.. {
1769        let key_name = format!("Tags.Tag.{index}.Key");
1770        let value_name = format!("Tags.Tag.{index}.Value");
1771        let key = optional_param(req, &key_name);
1772        let value = optional_param(req, &value_name);
1773
1774        match (key, value) {
1775            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1776            (None, None) => break,
1777            _ => {
1778                return Err(AwsServiceError::aws_error(
1779                    StatusCode::BAD_REQUEST,
1780                    "InvalidParameterValue",
1781                    "Each tag must include both Key and Value.",
1782                ));
1783            }
1784        }
1785    }
1786
1787    Ok(tags)
1788}
1789
1790fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1791    let mut keys = Vec::new();
1792    for index in 1.. {
1793        let key_name = format!("TagKeys.member.{index}");
1794        match optional_param(req, &key_name) {
1795            Some(key) => keys.push(key),
1796            None => break,
1797        }
1798    }
1799
1800    Ok(keys)
1801}
1802
1803fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1804    let mut subnet_ids = Vec::new();
1805    for index in 1.. {
1806        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1807        match optional_param(req, &subnet_id_name) {
1808            Some(subnet_id) => subnet_ids.push(subnet_id),
1809            None => break,
1810        }
1811    }
1812
1813    Ok(subnet_ids)
1814}
1815
1816fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1817    let mut security_group_ids = Vec::new();
1818    for index in 1.. {
1819        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1820        match optional_param(req, &sg_id_name) {
1821            Some(sg_id) => security_group_ids.push(sg_id),
1822            None => break,
1823        }
1824    }
1825
1826    // If no security groups provided, return a default one
1827    if security_group_ids.is_empty() {
1828        security_group_ids.push("sg-default".to_string());
1829    }
1830
1831    security_group_ids
1832}
1833
1834fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1835    req.query_params.keys().any(|key| key.starts_with(prefix))
1836}
1837
1838fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1839    value
1840        .map(|raw| match raw {
1841            "true" | "True" | "TRUE" => Ok(true),
1842            "false" | "False" | "FALSE" => Ok(false),
1843            _ => Err(AwsServiceError::aws_error(
1844                StatusCode::BAD_REQUEST,
1845                "InvalidParameterValue",
1846                format!("Boolean parameter value '{raw}' is invalid."),
1847            )),
1848        })
1849        .transpose()
1850}
1851
1852struct PaginationResult<T> {
1853    items: Vec<T>,
1854    next_marker: Option<String>,
1855}
1856
1857fn paginate<T, F>(
1858    mut items: Vec<T>,
1859    marker: Option<String>,
1860    max_records: Option<String>,
1861    get_id: F,
1862) -> Result<PaginationResult<T>, AwsServiceError>
1863where
1864    F: Fn(&T) -> &str,
1865{
1866    // Parse max_records with default 100, max 100
1867    let max = if let Some(max_str) = max_records {
1868        let parsed = max_str.parse::<i32>().map_err(|_| {
1869            AwsServiceError::aws_error(
1870                StatusCode::BAD_REQUEST,
1871                "InvalidParameterValue",
1872                "MaxRecords must be a valid integer.",
1873            )
1874        })?;
1875        if !(1..=100).contains(&parsed) {
1876            return Err(AwsServiceError::aws_error(
1877                StatusCode::BAD_REQUEST,
1878                "InvalidParameterValue",
1879                "MaxRecords must be between 1 and 100.",
1880            ));
1881        }
1882        parsed as usize
1883    } else {
1884        100
1885    };
1886
1887    // Decode marker to get starting identifier
1888    let start_id = if let Some(encoded_marker) = marker {
1889        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
1890            AwsServiceError::aws_error(
1891                StatusCode::BAD_REQUEST,
1892                "InvalidParameterValue",
1893                "Marker is invalid.",
1894            )
1895        })?;
1896        let id = String::from_utf8(decoded).map_err(|_| {
1897            AwsServiceError::aws_error(
1898                StatusCode::BAD_REQUEST,
1899                "InvalidParameterValue",
1900                "Marker is invalid.",
1901            )
1902        })?;
1903        Some(id)
1904    } else {
1905        None
1906    };
1907
1908    // Find starting position
1909    let start_index = if let Some(ref start_id) = start_id {
1910        items
1911            .iter()
1912            .position(|item| get_id(item) == start_id)
1913            .map(|pos| pos + 1) // Start after the marker
1914            .unwrap_or(items.len()) // If not found, return empty result
1915    } else {
1916        0
1917    };
1918
1919    // Take items from start_index
1920    let total_items = items.len();
1921    let end_index = std::cmp::min(start_index + max, total_items);
1922    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
1923
1924    // Create next marker if there are more items
1925    let next_marker = if end_index < total_items {
1926        paginated_items
1927            .last()
1928            .map(|item| BASE64.encode(get_id(item).as_bytes()))
1929    } else {
1930        None
1931    };
1932
1933    Ok(PaginationResult {
1934        items: paginated_items,
1935        next_marker,
1936    })
1937}
1938
1939fn validate_create_request(
1940    db_instance_identifier: &str,
1941    allocated_storage: i32,
1942    db_instance_class: &str,
1943    engine: &str,
1944    engine_version: &str,
1945    port: i32,
1946) -> Result<(), AwsServiceError> {
1947    if allocated_storage <= 0 {
1948        return Err(AwsServiceError::aws_error(
1949            StatusCode::BAD_REQUEST,
1950            "InvalidParameterValue",
1951            "AllocatedStorage must be greater than zero.",
1952        ));
1953    }
1954    if port <= 0 {
1955        return Err(AwsServiceError::aws_error(
1956            StatusCode::BAD_REQUEST,
1957            "InvalidParameterValue",
1958            "Port must be greater than zero.",
1959        ));
1960    }
1961    if !db_instance_identifier
1962        .chars()
1963        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
1964    {
1965        return Err(AwsServiceError::aws_error(
1966            StatusCode::BAD_REQUEST,
1967            "InvalidParameterValue",
1968            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
1969        ));
1970    }
1971    // Validate engine
1972    let supported_engines = ["postgres", "mysql", "mariadb"];
1973    if !supported_engines.contains(&engine) {
1974        return Err(AwsServiceError::aws_error(
1975            StatusCode::BAD_REQUEST,
1976            "InvalidParameterValue",
1977            format!("Engine '{}' is not supported.", engine),
1978        ));
1979    }
1980
1981    // Validate engine version
1982    let supported_versions = match engine {
1983        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
1984        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
1985        "mariadb" => vec!["10.11.6", "10.6.16"],
1986        _ => vec![],
1987    };
1988
1989    if !supported_versions.contains(&engine_version) {
1990        return Err(AwsServiceError::aws_error(
1991            StatusCode::BAD_REQUEST,
1992            "InvalidParameterValue",
1993            format!("EngineVersion '{engine_version}' is not supported yet."),
1994        ));
1995    }
1996    validate_db_instance_class(db_instance_class)?;
1997    Ok(())
1998}
1999
2000fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2001    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2002        return Err(AwsServiceError::aws_error(
2003            StatusCode::BAD_REQUEST,
2004            "InvalidParameterValue",
2005            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2006        ));
2007    }
2008    Ok(())
2009}
2010
2011fn filter_engine_versions(
2012    versions: &[EngineVersionInfo],
2013    engine: &Option<String>,
2014    engine_version: &Option<String>,
2015    family: &Option<String>,
2016) -> Vec<EngineVersionInfo> {
2017    versions
2018        .iter()
2019        .filter(|candidate| {
2020            engine
2021                .as_ref()
2022                .is_none_or(|expected| candidate.engine == *expected)
2023        })
2024        .filter(|candidate| {
2025            engine_version
2026                .as_ref()
2027                .is_none_or(|expected| candidate.engine_version == *expected)
2028        })
2029        .filter(|candidate| {
2030            family
2031                .as_ref()
2032                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2033        })
2034        .cloned()
2035        .collect()
2036}
2037
2038fn filter_orderable_options(
2039    options: &[OrderableDbInstanceOption],
2040    engine: &Option<String>,
2041    engine_version: &Option<String>,
2042    db_instance_class: &Option<String>,
2043    license_model: &Option<String>,
2044    vpc: Option<bool>,
2045) -> Vec<OrderableDbInstanceOption> {
2046    options
2047        .iter()
2048        .filter(|candidate| {
2049            engine
2050                .as_ref()
2051                .is_none_or(|expected| candidate.engine == *expected)
2052        })
2053        .filter(|candidate| {
2054            engine_version
2055                .as_ref()
2056                .is_none_or(|expected| candidate.engine_version == *expected)
2057        })
2058        .filter(|candidate| {
2059            db_instance_class
2060                .as_ref()
2061                .is_none_or(|expected| candidate.db_instance_class == *expected)
2062        })
2063        .filter(|candidate| {
2064            license_model
2065                .as_ref()
2066                .is_none_or(|expected| candidate.license_model == *expected)
2067        })
2068        .filter(|_| vpc.unwrap_or(true))
2069        .cloned()
2070        .collect()
2071}
2072
2073/// Build a `DbInstance` for a newly-created read replica, copying the
2074/// source instance's physical attributes and binding the replica's
2075/// identifier, ARN, resource id, container id and host port.
2076#[allow(clippy::too_many_arguments)]
2077/// Build a `DbInstance` from a restored snapshot. Copies the physical
2078/// attributes off the snapshot and binds the new instance's identifier,
2079/// ARN, resource id, container id and host port.
2080fn build_restored_instance(
2081    db_instance_identifier: &str,
2082    db_instance_arn: String,
2083    dbi_resource_id: String,
2084    created_at: chrono::DateTime<Utc>,
2085    vpc_security_group_ids: Vec<String>,
2086    snapshot: &DbSnapshot,
2087    running: &crate::runtime::RunningDbContainer,
2088) -> DbInstance {
2089    DbInstance {
2090        db_instance_identifier: db_instance_identifier.to_string(),
2091        db_instance_arn,
2092        db_instance_class: "db.t3.micro".to_string(),
2093        engine: snapshot.engine.clone(),
2094        engine_version: snapshot.engine_version.clone(),
2095        db_instance_status: "available".to_string(),
2096        master_username: snapshot.master_username.clone(),
2097        db_name: snapshot.db_name.clone(),
2098        endpoint_address: "127.0.0.1".to_string(),
2099        port: i32::from(running.host_port),
2100        allocated_storage: snapshot.allocated_storage,
2101        publicly_accessible: true,
2102        deletion_protection: false,
2103        created_at,
2104        dbi_resource_id,
2105        master_user_password: snapshot.master_user_password.clone(),
2106        container_id: running.container_id.clone(),
2107        host_port: running.host_port,
2108        tags: Vec::new(),
2109        read_replica_source_db_instance_identifier: None,
2110        read_replica_db_instance_identifiers: Vec::new(),
2111        vpc_security_group_ids,
2112        db_parameter_group_name: None,
2113        backup_retention_period: 1,
2114        preferred_backup_window: "03:00-04:00".to_string(),
2115        latest_restorable_time: Some(created_at),
2116        option_group_name: None,
2117        multi_az: false,
2118        pending_modified_values: None,
2119    }
2120}
2121
2122fn build_read_replica_instance(
2123    db_instance_identifier: &str,
2124    db_instance_arn: String,
2125    dbi_resource_id: String,
2126    created_at: chrono::DateTime<Utc>,
2127    source_db_instance_identifier: &str,
2128    source: &DbInstance,
2129    running: &crate::runtime::RunningDbContainer,
2130) -> DbInstance {
2131    DbInstance {
2132        db_instance_identifier: db_instance_identifier.to_string(),
2133        db_instance_arn,
2134        db_instance_class: source.db_instance_class.clone(),
2135        engine: source.engine.clone(),
2136        engine_version: source.engine_version.clone(),
2137        db_instance_status: "available".to_string(),
2138        master_username: source.master_username.clone(),
2139        db_name: source.db_name.clone(),
2140        endpoint_address: "127.0.0.1".to_string(),
2141        port: i32::from(running.host_port),
2142        allocated_storage: source.allocated_storage,
2143        publicly_accessible: source.publicly_accessible,
2144        deletion_protection: false,
2145        created_at,
2146        dbi_resource_id,
2147        master_user_password: source.master_user_password.clone(),
2148        container_id: running.container_id.clone(),
2149        host_port: running.host_port,
2150        tags: Vec::new(),
2151        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2152        read_replica_db_instance_identifiers: Vec::new(),
2153        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2154        db_parameter_group_name: source.db_parameter_group_name.clone(),
2155        backup_retention_period: source.backup_retention_period,
2156        preferred_backup_window: source.preferred_backup_window.clone(),
2157        latest_restorable_time: if source.backup_retention_period > 0 {
2158            Some(created_at)
2159        } else {
2160            None
2161        },
2162        option_group_name: source.option_group_name.clone(),
2163        multi_az: source.multi_az,
2164        pending_modified_values: None,
2165    }
2166}
2167
2168fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2169    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2170}
2171
2172fn engine_version_xml(version: &EngineVersionInfo) -> String {
2173    format!(
2174        "<DBEngineVersion>\
2175         <Engine>{}</Engine>\
2176         <EngineVersion>{}</EngineVersion>\
2177         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2178         <DBEngineDescription>{}</DBEngineDescription>\
2179         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2180         <Status>{}</Status>\
2181         </DBEngineVersion>",
2182        xml_escape(&version.engine),
2183        xml_escape(&version.engine_version),
2184        xml_escape(&version.db_parameter_group_family),
2185        xml_escape(&version.db_engine_description),
2186        xml_escape(&version.db_engine_version_description),
2187        xml_escape(&version.status),
2188    )
2189}
2190
2191fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2192    format!(
2193        "<OrderableDBInstanceOption>\
2194         <Engine>{}</Engine>\
2195         <EngineVersion>{}</EngineVersion>\
2196         <DBInstanceClass>{}</DBInstanceClass>\
2197         <LicenseModel>{}</LicenseModel>\
2198         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2199         <MultiAZCapable>true</MultiAZCapable>\
2200         <ReadReplicaCapable>true</ReadReplicaCapable>\
2201         <Vpc>true</Vpc>\
2202         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2203         <StorageType>{}</StorageType>\
2204         <SupportsIops>false</SupportsIops>\
2205         <MinStorageSize>{}</MinStorageSize>\
2206         <MaxStorageSize>{}</MaxStorageSize>\
2207         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2208         </OrderableDBInstanceOption>",
2209        xml_escape(&option.engine),
2210        xml_escape(&option.engine_version),
2211        xml_escape(&option.db_instance_class),
2212        xml_escape(&option.license_model),
2213        xml_escape(&option.storage_type),
2214        option.min_storage_size,
2215        option.max_storage_size,
2216    )
2217}
2218
2219fn tag_xml(tag: &RdsTag) -> String {
2220    format!(
2221        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2222        xml_escape(&tag.key),
2223        xml_escape(&tag.value),
2224    )
2225}
2226
2227fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2228    let status = status_override.unwrap_or(&instance.db_instance_status);
2229    let db_name_xml = instance
2230        .db_name
2231        .as_ref()
2232        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2233        .unwrap_or_default();
2234
2235    let read_replica_source_xml = instance
2236        .read_replica_source_db_instance_identifier
2237        .as_ref()
2238        .map(|source| {
2239            format!(
2240                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2241                xml_escape(source)
2242            )
2243        })
2244        .unwrap_or_default();
2245
2246    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2247        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2248    } else {
2249        format!(
2250            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2251            instance
2252                .read_replica_db_instance_identifiers
2253                .iter()
2254                .map(|id| format!(
2255                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2256                    xml_escape(id)
2257                ))
2258                .collect::<String>()
2259        )
2260    };
2261
2262    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2263        "<VpcSecurityGroups/>".to_string()
2264    } else {
2265        format!(
2266            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2267            instance
2268                .vpc_security_group_ids
2269                .iter()
2270                .map(|sg_id| format!(
2271                    "<VpcSecurityGroupMembership>\
2272                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2273                     <Status>active</Status>\
2274                     </VpcSecurityGroupMembership>",
2275                    xml_escape(sg_id)
2276                ))
2277                .collect::<String>()
2278        )
2279    };
2280
2281    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2282        Some(pg_name) => format!(
2283            "<DBParameterGroups>\
2284             <DBParameterGroup>\
2285             <DBParameterGroupName>{}</DBParameterGroupName>\
2286             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2287             </DBParameterGroup>\
2288             </DBParameterGroups>",
2289            xml_escape(pg_name)
2290        ),
2291        None => "<DBParameterGroups/>".to_string(),
2292    };
2293
2294    let option_group_memberships_xml = match &instance.option_group_name {
2295        Some(og_name) => format!(
2296            "<OptionGroupMemberships>\
2297             <OptionGroupMembership>\
2298             <OptionGroupName>{}</OptionGroupName>\
2299             <Status>in-sync</Status>\
2300             </OptionGroupMembership>\
2301             </OptionGroupMemberships>",
2302            xml_escape(og_name)
2303        ),
2304        None => "<OptionGroupMemberships/>".to_string(),
2305    };
2306
2307    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2308        let mut fields = Vec::new();
2309        if let Some(ref class) = pending.db_instance_class {
2310            fields.push(format!(
2311                "<DBInstanceClass>{}</DBInstanceClass>",
2312                xml_escape(class)
2313            ));
2314        }
2315        if let Some(allocated_storage) = pending.allocated_storage {
2316            fields.push(format!(
2317                "<AllocatedStorage>{}</AllocatedStorage>",
2318                allocated_storage
2319            ));
2320        }
2321        if let Some(backup_retention_period) = pending.backup_retention_period {
2322            fields.push(format!(
2323                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2324                backup_retention_period
2325            ));
2326        }
2327        if let Some(multi_az) = pending.multi_az {
2328            fields.push(format!(
2329                "<MultiAZ>{}</MultiAZ>",
2330                if multi_az { "true" } else { "false" }
2331            ));
2332        }
2333        if let Some(ref engine_version) = pending.engine_version {
2334            fields.push(format!(
2335                "<EngineVersion>{}</EngineVersion>",
2336                xml_escape(engine_version)
2337            ));
2338        }
2339        if pending.master_user_password.is_some() {
2340            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2341        }
2342        if !fields.is_empty() {
2343            format!(
2344                "<PendingModifiedValues>{}</PendingModifiedValues>",
2345                fields.join("")
2346            )
2347        } else {
2348            String::new()
2349        }
2350    } else {
2351        String::new()
2352    };
2353
2354    let latest_restorable_time_xml = instance
2355        .latest_restorable_time
2356        .map(|t| {
2357            format!(
2358                "<LatestRestorableTime>{}</LatestRestorableTime>",
2359                t.to_rfc3339()
2360            )
2361        })
2362        .unwrap_or_default();
2363
2364    format!(
2365        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2366         <DBInstanceClass>{class}</DBInstanceClass>\
2367         <Engine>{engine}</Engine>\
2368         <DBInstanceStatus>{status}</DBInstanceStatus>\
2369         <MasterUsername>{master_username}</MasterUsername>\
2370         {db_name_xml}\
2371         <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2372         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2373         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2374         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2375         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2376         <DBSecurityGroups/>\
2377         {vpc_security_groups_xml}\
2378         {db_parameter_groups_xml}\
2379         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2380         {latest_restorable_time_xml}\
2381         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2382         <MultiAZ>{multi_az}</MultiAZ>\
2383         <EngineVersion>{engine_version}</EngineVersion>\
2384         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2385         {read_replica_identifiers_xml}\
2386         {read_replica_source_xml}\
2387         <LicenseModel>{license_model}</LicenseModel>\
2388         {option_group_memberships_xml}\
2389         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2390         <StorageType>gp2</StorageType>\
2391         <DbInstancePort>{port}</DbInstancePort>\
2392         <StorageEncrypted>false</StorageEncrypted>\
2393         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2394         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2395         {pending_modified_values_xml}\
2396         <DBInstanceArn>{arn}</DBInstanceArn>",
2397        identifier = xml_escape(&instance.db_instance_identifier),
2398        class = xml_escape(&instance.db_instance_class),
2399        engine = xml_escape(&instance.engine),
2400        status = xml_escape(status),
2401        master_username = xml_escape(&instance.master_username),
2402        endpoint_address = xml_escape(&instance.endpoint_address),
2403        port = instance.port,
2404        allocated_storage = instance.allocated_storage,
2405        create_time = instance.created_at.to_rfc3339(),
2406        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2407        backup_retention_period = instance.backup_retention_period,
2408        multi_az = if instance.multi_az { "true" } else { "false" },
2409        engine_version = xml_escape(&instance.engine_version),
2410        license_model = license_model_for_engine(&instance.engine),
2411        publicly_accessible = if instance.publicly_accessible {
2412            "true"
2413        } else {
2414            "false"
2415        },
2416        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2417        deletion_protection = if instance.deletion_protection {
2418            "true"
2419        } else {
2420            "false"
2421        },
2422        arn = xml_escape(&instance.db_instance_arn),
2423    )
2424}
2425
2426fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2427    format!(
2428        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2429         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2430         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2431         <Engine>{}</Engine>\
2432         <EngineVersion>{}</EngineVersion>\
2433         <AllocatedStorage>{}</AllocatedStorage>\
2434         <Status>{}</Status>\
2435         <Port>{}</Port>\
2436         <MasterUsername>{}</MasterUsername>\
2437         {}\
2438         <DbiResourceId>{}</DbiResourceId>\
2439         <SnapshotType>{}</SnapshotType>\
2440         <DBSnapshotArn>{}</DBSnapshotArn>",
2441        xml_escape(&snapshot.db_snapshot_identifier),
2442        xml_escape(&snapshot.db_instance_identifier),
2443        snapshot.snapshot_create_time.to_rfc3339(),
2444        xml_escape(&snapshot.engine),
2445        xml_escape(&snapshot.engine_version),
2446        snapshot.allocated_storage,
2447        xml_escape(&snapshot.status),
2448        snapshot.port,
2449        xml_escape(&snapshot.master_username),
2450        snapshot
2451            .db_name
2452            .as_ref()
2453            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2454            .unwrap_or_default(),
2455        xml_escape(&snapshot.dbi_resource_id),
2456        xml_escape(&snapshot.snapshot_type),
2457        xml_escape(&snapshot.db_snapshot_arn),
2458    )
2459}
2460
2461fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2462    let subnets_xml = subnet_group
2463        .subnet_ids
2464        .iter()
2465        .zip(&subnet_group.subnet_availability_zones)
2466        .map(|(subnet_id, az)| {
2467            format!(
2468                "<Subnet>\
2469                 <SubnetIdentifier>{}</SubnetIdentifier>\
2470                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2471                 <SubnetStatus>Active</SubnetStatus>\
2472                 </Subnet>",
2473                xml_escape(subnet_id),
2474                xml_escape(az)
2475            )
2476        })
2477        .collect::<String>();
2478
2479    format!(
2480        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2481         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2482         <VpcId>{}</VpcId>\
2483         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2484         <Subnets>{}</Subnets>\
2485         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2486        xml_escape(&subnet_group.db_subnet_group_name),
2487        xml_escape(&subnet_group.db_subnet_group_description),
2488        xml_escape(&subnet_group.vpc_id),
2489        subnets_xml,
2490        xml_escape(&subnet_group.db_subnet_group_arn),
2491    )
2492}
2493
2494fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2495    format!(
2496        "<DBParameterGroupName>{}</DBParameterGroupName>\
2497         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2498         <Description>{}</Description>\
2499         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2500        xml_escape(&parameter_group.db_parameter_group_name),
2501        xml_escape(&parameter_group.db_parameter_group_family),
2502        xml_escape(&parameter_group.description),
2503        xml_escape(&parameter_group.db_parameter_group_arn),
2504    )
2505}
2506
2507fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2508    AwsServiceError::aws_error(
2509        StatusCode::NOT_FOUND,
2510        "DBInstanceNotFound",
2511        format!("DBInstance {} not found.", identifier),
2512    )
2513}
2514
2515fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2516    AwsServiceError::aws_error(
2517        StatusCode::NOT_FOUND,
2518        "DBSnapshotNotFound",
2519        format!("DBSnapshot {} not found.", identifier),
2520    )
2521}
2522
2523fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2524    AwsServiceError::aws_error(
2525        StatusCode::NOT_FOUND,
2526        "DBInstanceNotFound",
2527        format!("DBInstance {resource_name} not found."),
2528    )
2529}
2530
2531fn find_instance_by_arn<'a>(
2532    state: &'a crate::state::RdsState,
2533    resource_name: &str,
2534) -> Result<&'a DbInstance, AwsServiceError> {
2535    state
2536        .instances
2537        .values()
2538        .find(|instance| instance.db_instance_arn == resource_name)
2539        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2540}
2541
2542fn find_instance_by_arn_mut<'a>(
2543    state: &'a mut crate::state::RdsState,
2544    resource_name: &str,
2545) -> Result<&'a mut DbInstance, AwsServiceError> {
2546    state
2547        .instances
2548        .values_mut()
2549        .find(|instance| instance.db_instance_arn == resource_name)
2550        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2551}
2552
2553fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2554    for tag in incoming {
2555        if let Some(existing_tag) = existing
2556            .iter_mut()
2557            .find(|candidate| candidate.key == tag.key)
2558        {
2559            existing_tag.value = tag.value.clone();
2560        } else {
2561            existing.push(tag.clone());
2562        }
2563    }
2564}
2565
2566fn license_model_for_engine(engine: &str) -> &'static str {
2567    match engine {
2568        "mysql" | "mariadb" => "general-public-license",
2569        _ => "postgresql-license",
2570    }
2571}
2572
2573fn default_db_name(engine: &str) -> &'static str {
2574    match engine {
2575        "mysql" | "mariadb" => "mysql",
2576        _ => "postgres",
2577    }
2578}
2579
2580/// Pick the port AWS defaults to for a freshly-created instance of
2581/// `engine`. PostgreSQL lives on 5432; MySQL and MariaDB share 3306.
2582fn default_port_for_engine(engine: &str) -> i32 {
2583    match engine {
2584        "postgres" => 5432,
2585        "mysql" | "mariadb" => 3306,
2586        _ => 5432,
2587    }
2588}
2589
2590/// Pick the built-in parameter group name AWS assigns to a new
2591/// instance when the caller doesn't override it. The name encodes the
2592/// engine family plus its major version (e.g. `default.postgres16`,
2593/// `default.mysql8.0`).
2594fn default_parameter_group(engine: &str, engine_version: &str) -> String {
2595    match engine {
2596        "postgres" => {
2597            let major = engine_version.split('.').next().unwrap_or("16");
2598            format!("default.postgres{}", major)
2599        }
2600        "mysql" => {
2601            let major = if engine_version.starts_with("5.7") {
2602                "5.7"
2603            } else {
2604                "8.0"
2605            };
2606            format!("default.mysql{}", major)
2607        }
2608        "mariadb" => {
2609            let major = if engine_version.starts_with("10.11") {
2610                "10.11"
2611            } else {
2612                "10.6"
2613            };
2614            format!("default.mariadb{}", major)
2615        }
2616        _ => "default.postgres16".to_string(),
2617    }
2618}
2619
2620fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2621    match error {
2622        RuntimeError::Unavailable => AwsServiceError::aws_error(
2623            StatusCode::SERVICE_UNAVAILABLE,
2624            "InvalidParameterValue",
2625            "Docker/Podman is required for RDS DB instances but is not available",
2626        ),
2627        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2628            StatusCode::INTERNAL_SERVER_ERROR,
2629            "InternalFailure",
2630            message,
2631        ),
2632    }
2633}
2634
2635#[cfg(test)]
2636mod tests {
2637    use std::collections::HashMap;
2638    use std::sync::Arc;
2639
2640    use bytes::Bytes;
2641    use chrono::Utc;
2642    use http::{HeaderMap, Method};
2643    use parking_lot::RwLock;
2644    use uuid::Uuid;
2645
2646    use super::{
2647        db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2648        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2649    };
2650    use crate::state::{
2651        default_engine_versions, default_orderable_options, DbInstance, RdsState, RdsTag,
2652    };
2653    use fakecloud_core::service::{AwsRequest, AwsService};
2654
2655    #[test]
2656    fn filter_engine_versions_matches_requested_engine() {
2657        let versions = default_engine_versions();
2658
2659        let filtered =
2660            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2661
2662        assert_eq!(filtered.len(), 4); // All postgres versions
2663        assert!(filtered.iter().all(|v| v.engine == "postgres"));
2664    }
2665
2666    #[test]
2667    fn filter_orderable_options_respects_instance_class() {
2668        let options = default_orderable_options();
2669
2670        let filtered = filter_orderable_options(
2671            &options,
2672            &Some("postgres".to_string()),
2673            &Some("16.3".to_string()),
2674            &Some("db.t3.micro".to_string()),
2675            &None,
2676            Some(true),
2677        );
2678
2679        assert_eq!(filtered.len(), 1);
2680        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2681    }
2682
2683    #[test]
2684    fn validate_create_request_rejects_unsupported_engine() {
2685        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2686            .expect_err("unsupported engine");
2687
2688        assert_eq!(error.code(), "InvalidParameterValue");
2689    }
2690
2691    #[test]
2692    fn optional_i32_param_rejects_invalid_integer() {
2693        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2694
2695        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2696
2697        assert_eq!(error.code(), "InvalidParameterValue");
2698    }
2699
2700    #[test]
2701    fn db_instance_xml_renders_endpoint_and_status() {
2702        let created_at = Utc::now();
2703        let instance = DbInstance {
2704            db_instance_identifier: "test-db".to_string(),
2705            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2706            db_instance_class: "db.t3.micro".to_string(),
2707            engine: "postgres".to_string(),
2708            engine_version: "16.3".to_string(),
2709            db_instance_status: "available".to_string(),
2710            master_username: "admin".to_string(),
2711            db_name: Some("appdb".to_string()),
2712            endpoint_address: "127.0.0.1".to_string(),
2713            port: 15432,
2714            allocated_storage: 20,
2715            publicly_accessible: true,
2716            deletion_protection: false,
2717            created_at,
2718            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2719            master_user_password: "secret123".to_string(),
2720            container_id: "container".to_string(),
2721            host_port: 15432,
2722            tags: Vec::new(),
2723            read_replica_source_db_instance_identifier: None,
2724            read_replica_db_instance_identifiers: Vec::new(),
2725            vpc_security_group_ids: vec!["sg-12345678".to_string()],
2726            db_parameter_group_name: Some("default.postgres16".to_string()),
2727            backup_retention_period: 1,
2728            preferred_backup_window: "03:00-04:00".to_string(),
2729            latest_restorable_time: Some(created_at),
2730            option_group_name: None,
2731            multi_az: false,
2732            pending_modified_values: None,
2733        };
2734
2735        let xml = db_instance_xml(&instance, Some("creating"));
2736
2737        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2738        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2739        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2740    }
2741
2742    #[test]
2743    fn parse_tags_reads_rds_query_shape() {
2744        let request = request(
2745            "AddTagsToResource",
2746            &[
2747                ("Tags.Tag.1.Key", "env"),
2748                ("Tags.Tag.1.Value", "dev"),
2749                ("Tags.Tag.2.Key", "team"),
2750                ("Tags.Tag.2.Value", "core"),
2751            ],
2752        );
2753
2754        let tags = parse_tags(&request).expect("tags");
2755
2756        assert_eq!(
2757            tags,
2758            vec![
2759                RdsTag {
2760                    key: "env".to_string(),
2761                    value: "dev".to_string(),
2762                },
2763                RdsTag {
2764                    key: "team".to_string(),
2765                    value: "core".to_string(),
2766                }
2767            ]
2768        );
2769    }
2770
2771    #[test]
2772    fn parse_tag_keys_reads_member_shape() {
2773        let request = request(
2774            "RemoveTagsFromResource",
2775            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2776        );
2777
2778        let tag_keys = parse_tag_keys(&request).expect("tag keys");
2779
2780        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2781    }
2782
2783    #[test]
2784    fn merge_tags_updates_existing_values() {
2785        let mut tags = vec![RdsTag {
2786            key: "env".to_string(),
2787            value: "dev".to_string(),
2788        }];
2789
2790        merge_tags(
2791            &mut tags,
2792            &[
2793                RdsTag {
2794                    key: "env".to_string(),
2795                    value: "prod".to_string(),
2796                },
2797                RdsTag {
2798                    key: "team".to_string(),
2799                    value: "core".to_string(),
2800                },
2801            ],
2802        );
2803
2804        assert_eq!(tags.len(), 2);
2805        assert_eq!(tags[0].value, "prod");
2806        assert_eq!(tags[1].key, "team");
2807    }
2808
2809    #[tokio::test]
2810    async fn describe_engine_versions_returns_xml_body() {
2811        let service = RdsService::new(Arc::new(RwLock::new(RdsState::new(
2812            "123456789012",
2813            "us-east-1",
2814        ))));
2815        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2816
2817        let response = service.handle(request).await.expect("response");
2818        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
2819
2820        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2821        assert!(body.contains("<Engine>postgres</Engine>"));
2822        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2823    }
2824
2825    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2826        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2827        for (key, value) in params {
2828            query_params.insert((*key).to_string(), (*value).to_string());
2829        }
2830
2831        AwsRequest {
2832            service: "rds".to_string(),
2833            action: action.to_string(),
2834            region: "us-east-1".to_string(),
2835            account_id: "123456789012".to_string(),
2836            request_id: "test-request-id".to_string(),
2837            headers: HeaderMap::new(),
2838            query_params,
2839            body: Bytes::new(),
2840            path_segments: vec![],
2841            raw_path: "/".to_string(),
2842            raw_query: String::new(),
2843            method: Method::POST,
2844            is_query_protocol: true,
2845            access_key_id: None,
2846        }
2847    }
2848}