Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::{RdsRuntime, RuntimeError};
15use crate::state::{
16    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
17    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
18    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
19};
20
21const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
22
23fn is_mutating_action(action: &str) -> bool {
24    matches!(
25        action,
26        "AddTagsToResource"
27            | "CreateDBInstance"
28            | "CreateDBInstanceReadReplica"
29            | "CreateDBParameterGroup"
30            | "CreateDBSnapshot"
31            | "CreateDBSubnetGroup"
32            | "DeleteDBInstance"
33            | "DeleteDBParameterGroup"
34            | "DeleteDBSnapshot"
35            | "DeleteDBSubnetGroup"
36            | "ModifyDBInstance"
37            | "ModifyDBParameterGroup"
38            | "ModifyDBSubnetGroup"
39            | "RebootDBInstance"
40            | "RemoveTagsFromResource"
41            | "RestoreDBInstanceFromDBSnapshot"
42    )
43}
44const SUPPORTED_ACTIONS: &[&str] = &[
45    "AddTagsToResource",
46    "CreateDBInstance",
47    "CreateDBInstanceReadReplica",
48    "CreateDBParameterGroup",
49    "CreateDBSnapshot",
50    "CreateDBSubnetGroup",
51    "DeleteDBInstance",
52    "DeleteDBParameterGroup",
53    "DeleteDBSnapshot",
54    "DeleteDBSubnetGroup",
55    "DescribeDBEngineVersions",
56    "DescribeDBInstances",
57    "DescribeDBParameterGroups",
58    "DescribeDBSnapshots",
59    "DescribeDBSubnetGroups",
60    "DescribeOrderableDBInstanceOptions",
61    "ListTagsForResource",
62    "ModifyDBInstance",
63    "ModifyDBParameterGroup",
64    "ModifyDBSubnetGroup",
65    "RebootDBInstance",
66    "RemoveTagsFromResource",
67    "RestoreDBInstanceFromDBSnapshot",
68];
69
70pub struct RdsService {
71    state: SharedRdsState,
72    runtime: Option<Arc<RdsRuntime>>,
73    snapshot_store: Option<Arc<dyn SnapshotStore>>,
74    snapshot_lock: Arc<AsyncMutex<()>>,
75}
76
77impl RdsService {
78    pub fn new(state: SharedRdsState) -> Self {
79        Self {
80            state,
81            runtime: None,
82            snapshot_store: None,
83            snapshot_lock: Arc::new(AsyncMutex::new(())),
84        }
85    }
86
87    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
88        self.runtime = Some(runtime);
89        self
90    }
91
92    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
93        self.snapshot_store = Some(store);
94        self
95    }
96
97    async fn save_snapshot(&self) {
98        let Some(store) = self.snapshot_store.clone() else {
99            return;
100        };
101        let _guard = self.snapshot_lock.lock().await;
102        let snapshot = RdsSnapshot {
103            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
104            state: None,
105            accounts: Some(self.state.read().clone()),
106        };
107        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
108            let bytes = serde_json::to_vec(&snapshot)
109                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
110            store.save(&bytes)
111        })
112        .await;
113        match join {
114            Ok(Ok(())) => {}
115            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
116            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
117        }
118    }
119
120    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
121    ///
122    /// RDS operations that start, stop, or reach into a database container fail
123    /// with a consistent wire error when the daemon (Docker/Podman) is missing
124    /// rather than each caller restating the message.
125    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
126        self.runtime.as_ref().ok_or_else(|| {
127            AwsServiceError::aws_error(
128                StatusCode::SERVICE_UNAVAILABLE,
129                "InvalidParameterValue",
130                "Docker/Podman is required for RDS DB instances but is not available",
131            )
132        })
133    }
134}
135
136#[async_trait]
137impl AwsService for RdsService {
138    fn service_name(&self) -> &str {
139        "rds"
140    }
141
142    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
143        let mutates = is_mutating_action(request.action.as_str());
144        let result = match request.action.as_str() {
145            "AddTagsToResource" => self.add_tags_to_resource(&request),
146            "CreateDBInstance" => self.create_db_instance(&request).await,
147            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
148            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
149            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
150            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
151            "DeleteDBInstance" => self.delete_db_instance(&request).await,
152            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
153            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
154            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
155            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
156            "DescribeDBInstances" => self.describe_db_instances(&request),
157            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
158            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
159            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
160            "DescribeOrderableDBInstanceOptions" => {
161                self.describe_orderable_db_instance_options(&request)
162            }
163            "ListTagsForResource" => self.list_tags_for_resource(&request),
164            "ModifyDBInstance" => self.modify_db_instance(&request),
165            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
166            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
167            "RebootDBInstance" => self.reboot_db_instance(&request).await,
168            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
169            "RestoreDBInstanceFromDBSnapshot" => {
170                self.restore_db_instance_from_db_snapshot(&request).await
171            }
172            _ => Err(AwsServiceError::action_not_implemented(
173                self.service_name(),
174                &request.action,
175            )),
176        };
177        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
178            self.save_snapshot().await;
179        }
180        result
181    }
182
183    fn supported_actions(&self) -> &[&str] {
184        SUPPORTED_ACTIONS
185    }
186}
187
188impl RdsService {
189    async fn create_db_instance(
190        &self,
191        request: &AwsRequest,
192    ) -> Result<AwsResponse, AwsServiceError> {
193        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
194        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
195        let db_instance_class = required_param(request, "DBInstanceClass")?;
196        let engine = required_param(request, "Engine")?;
197        let master_username = required_param(request, "MasterUsername")?;
198        let master_user_password = required_param(request, "MasterUserPassword")?;
199        let db_name = optional_param(request, "DBName");
200        let engine_version =
201            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
202        let publicly_accessible =
203            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
204                .unwrap_or(true);
205        let deletion_protection =
206            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
207                .unwrap_or(false);
208        let port = optional_i32_param(request, "Port")?
209            .unwrap_or_else(|| default_port_for_engine(&engine));
210        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
211
212        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
213            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
214
215        let backup_retention_period =
216            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
217        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
218            .unwrap_or_else(|| "03:00-04:00".to_string());
219        let option_group_name = optional_param(request, "OptionGroupName");
220        let multi_az =
221            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
222
223        validate_create_request(
224            &db_instance_identifier,
225            allocated_storage,
226            &db_instance_class,
227            &engine,
228            &engine_version,
229            port,
230        )?;
231
232        {
233            let mut accounts = self.state.write();
234            let state = accounts.get_or_create(&request.account_id);
235            if !state.begin_instance_creation(&db_instance_identifier) {
236                return Err(AwsServiceError::aws_error(
237                    StatusCode::BAD_REQUEST,
238                    "DBInstanceAlreadyExists",
239                    format!("DBInstance {} already exists.", db_instance_identifier),
240                ));
241            }
242            // Validate parameter group exists if specified by the caller
243            if let Some(ref pg_name) = db_parameter_group_name {
244                if !state.parameter_groups.contains_key(pg_name) {
245                    state.cancel_instance_creation(&db_instance_identifier);
246                    return Err(AwsServiceError::aws_error(
247                        StatusCode::NOT_FOUND,
248                        "DBParameterGroupNotFound",
249                        format!("DBParameterGroup {} not found.", pg_name),
250                    ));
251                }
252            }
253        }
254
255        let runtime = self.require_runtime()?;
256
257        let logical_db_name = db_name
258            .clone()
259            .unwrap_or_else(|| default_db_name(&engine).to_string());
260        let running = runtime
261            .ensure_postgres(
262                &db_instance_identifier,
263                &engine,
264                &engine_version,
265                &master_username,
266                &master_user_password,
267                &logical_db_name,
268            )
269            .await
270            .map_err(|error| {
271                self.state
272                    .write()
273                    .get_or_create(&request.account_id)
274                    .cancel_instance_creation(&db_instance_identifier);
275                runtime_error_to_service_error(error)
276            })?;
277
278        let mut accounts = self.state.write();
279        let state = accounts.get_or_create(&request.account_id);
280        let created_at = Utc::now();
281        let instance = DbInstance {
282            db_instance_identifier: db_instance_identifier.clone(),
283            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
284            db_instance_class: db_instance_class.clone(),
285            engine: engine.clone(),
286            engine_version: engine_version.clone(),
287            db_instance_status: "available".to_string(),
288            master_username: master_username.clone(),
289            db_name: db_name.clone(),
290            endpoint_address: "127.0.0.1".to_string(),
291            port: i32::from(running.host_port),
292            allocated_storage,
293            publicly_accessible,
294            deletion_protection,
295            created_at,
296            dbi_resource_id: state.next_dbi_resource_id(),
297            master_user_password,
298            container_id: running.container_id,
299            host_port: running.host_port,
300            tags: Vec::new(),
301            read_replica_source_db_instance_identifier: None,
302            read_replica_db_instance_identifiers: Vec::new(),
303            vpc_security_group_ids,
304            db_parameter_group_name,
305            backup_retention_period,
306            preferred_backup_window,
307            latest_restorable_time: if backup_retention_period > 0 {
308                Some(created_at)
309            } else {
310                None
311            },
312            option_group_name,
313            multi_az,
314            pending_modified_values: None,
315        };
316        state.finish_instance_creation(instance.clone());
317
318        Ok(AwsResponse::xml(
319            StatusCode::OK,
320            xml_wrap(
321                "CreateDBInstance",
322                &format!(
323                    "<DBInstance>{}</DBInstance>",
324                    db_instance_xml(&instance, Some("creating"))
325                ),
326                &request.request_id,
327            ),
328        ))
329    }
330
331    async fn delete_db_instance(
332        &self,
333        request: &AwsRequest,
334    ) -> Result<AwsResponse, AwsServiceError> {
335        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
336        let skip_final_snapshot =
337            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
338                .unwrap_or(false);
339        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
340
341        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
342            return Err(AwsServiceError::aws_error(
343                StatusCode::BAD_REQUEST,
344                "InvalidParameterCombination",
345                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
346            ));
347        }
348        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
349            return Err(AwsServiceError::aws_error(
350                StatusCode::BAD_REQUEST,
351                "InvalidParameterCombination",
352                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
353            ));
354        }
355
356        // Check deletion protection BEFORE creating snapshot or making any changes
357        {
358            let accounts = self.state.read();
359            let empty = RdsState::new(&request.account_id, &request.region);
360            let state = accounts.get(&request.account_id).unwrap_or(&empty);
361            if let Some(instance) = state.instances.get(&db_instance_identifier) {
362                if instance.deletion_protection {
363                    return Err(AwsServiceError::aws_error(
364                        StatusCode::BAD_REQUEST,
365                        "InvalidDBInstanceState",
366                        format!(
367                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
368                            db_instance_identifier
369                        ),
370                    ));
371                }
372            } else {
373                return Err(db_instance_not_found(&db_instance_identifier));
374            }
375        }
376
377        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
378            self.create_final_db_snapshot(
379                &db_instance_identifier,
380                snapshot_id,
381                &request.account_id,
382                &request.region,
383            )
384            .await?;
385        }
386
387        let instance = {
388            let mut accounts = self.state.write();
389            let state = accounts.get_or_create(&request.account_id);
390            let instance = state
391                .instances
392                .remove(&db_instance_identifier)
393                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
394
395            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
396                if let Some(source) = state.instances.get_mut(source_id) {
397                    source
398                        .read_replica_db_instance_identifiers
399                        .retain(|id| id != &db_instance_identifier);
400                }
401            }
402
403            for replica_id in &instance.read_replica_db_instance_identifiers {
404                if let Some(replica) = state.instances.get_mut(replica_id) {
405                    replica.read_replica_source_db_instance_identifier = None;
406                }
407            }
408
409            instance
410        };
411
412        if let Some(runtime) = &self.runtime {
413            runtime.stop_container(&db_instance_identifier).await;
414        }
415
416        Ok(AwsResponse::xml(
417            StatusCode::OK,
418            xml_wrap(
419                "DeleteDBInstance",
420                &format!(
421                    "<DBInstance>{}</DBInstance>",
422                    db_instance_xml(&instance, Some("deleting"))
423                ),
424                &request.request_id,
425            ),
426        ))
427    }
428
429    /// Take a final snapshot of an instance that is about to be deleted,
430    /// persisting the dumped database into `state.snapshots`. The DLQ-style
431    /// conflict check runs twice — once under the read lock before paying
432    /// for the dump, once under the write lock before committing — to keep
433    /// concurrent deletes from colliding.
434    async fn create_final_db_snapshot(
435        &self,
436        db_instance_identifier: &str,
437        snapshot_id: &str,
438        account_id: &str,
439        region: &str,
440    ) -> Result<(), AwsServiceError> {
441        let runtime = self.runtime.as_ref().ok_or_else(|| {
442            AwsServiceError::aws_error(
443                StatusCode::SERVICE_UNAVAILABLE,
444                "InvalidParameterValue",
445                "Docker/Podman is required for RDS snapshots but is not available",
446            )
447        })?;
448
449        let (instance_for_snapshot, db_name) = {
450            let accounts = self.state.read();
451            let empty = RdsState::new(account_id, region);
452            let state = accounts.get(account_id).unwrap_or(&empty);
453
454            if state.snapshots.contains_key(snapshot_id) {
455                return Err(AwsServiceError::aws_error(
456                    StatusCode::CONFLICT,
457                    "DBSnapshotAlreadyExists",
458                    format!("DBSnapshot {snapshot_id} already exists."),
459                ));
460            }
461
462            let instance = state
463                .instances
464                .get(db_instance_identifier)
465                .cloned()
466                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
467
468            let default_db = default_db_name(&instance.engine);
469            let db_name = instance
470                .db_name
471                .as_deref()
472                .unwrap_or(default_db)
473                .to_string();
474
475            (instance, db_name)
476        };
477
478        let dump_data = runtime
479            .dump_database(
480                db_instance_identifier,
481                &instance_for_snapshot.engine,
482                &instance_for_snapshot.master_username,
483                &instance_for_snapshot.master_user_password,
484                &db_name,
485            )
486            .await
487            .map_err(runtime_error_to_service_error)?;
488
489        let mut accounts = self.state.write();
490        let state = accounts.get_or_create(account_id);
491
492        if state.snapshots.contains_key(snapshot_id) {
493            return Err(AwsServiceError::aws_error(
494                StatusCode::CONFLICT,
495                "DBSnapshotAlreadyExists",
496                format!("DBSnapshot {snapshot_id} already exists."),
497            ));
498        }
499
500        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
501
502        let snapshot = DbSnapshot {
503            db_snapshot_identifier: snapshot_id.to_string(),
504            db_snapshot_arn: snapshot_arn,
505            db_instance_identifier: db_instance_identifier.to_string(),
506            snapshot_create_time: Utc::now(),
507            engine: instance_for_snapshot.engine.clone(),
508            engine_version: instance_for_snapshot.engine_version.clone(),
509            allocated_storage: instance_for_snapshot.allocated_storage,
510            status: "available".to_string(),
511            port: instance_for_snapshot.port,
512            master_username: instance_for_snapshot.master_username.clone(),
513            db_name: instance_for_snapshot.db_name.clone(),
514            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
515            snapshot_type: "manual".to_string(),
516            master_user_password: instance_for_snapshot.master_user_password.clone(),
517            tags: Vec::new(),
518            dump_data,
519        };
520
521        state.snapshots.insert(snapshot_id.to_string(), snapshot);
522        Ok(())
523    }
524
525    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
526        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
527        let db_instance_class = optional_param(request, "DBInstanceClass");
528        let deletion_protection =
529            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
530        let apply_immediately =
531            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
532
533        // Parse VPC security group IDs - only if at least one is provided
534        let vpc_security_group_ids = {
535            let mut ids = Vec::new();
536            for index in 1.. {
537                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
538                match optional_param(request, &sg_id_name) {
539                    Some(sg_id) => ids.push(sg_id),
540                    None => break,
541                }
542            }
543            if ids.is_empty() {
544                None
545            } else {
546                Some(ids)
547            }
548        };
549
550        if db_instance_class.is_none()
551            && deletion_protection.is_none()
552            && vpc_security_group_ids.is_none()
553        {
554            return Err(AwsServiceError::aws_error(
555                StatusCode::BAD_REQUEST,
556                "InvalidParameterCombination",
557                "At least one supported mutable field must be provided.",
558            ));
559        }
560        if let Some(ref class) = db_instance_class {
561            validate_db_instance_class(class)?;
562        }
563
564        let mut accounts = self.state.write();
565        let state = accounts.get_or_create(&request.account_id);
566        let instance = state
567            .instances
568            .get_mut(&db_instance_identifier)
569            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
570
571        // If ApplyImmediately is false, stage changes as pending
572        if apply_immediately == Some(false) {
573            let pending = instance
574                .pending_modified_values
575                .get_or_insert(Default::default());
576            if let Some(class) = db_instance_class {
577                pending.db_instance_class = Some(class);
578            }
579            // Note: deletion_protection and vpc_security_group_ids are applied immediately
580            // regardless of ApplyImmediately flag (per AWS behavior)
581            if let Some(deletion_protection) = deletion_protection {
582                instance.deletion_protection = deletion_protection;
583            }
584            if let Some(security_group_ids) = vpc_security_group_ids {
585                instance.vpc_security_group_ids = security_group_ids;
586            }
587        } else {
588            // Apply immediately (default behavior)
589            if let Some(class) = db_instance_class {
590                instance.db_instance_class = class;
591            }
592            if let Some(deletion_protection) = deletion_protection {
593                instance.deletion_protection = deletion_protection;
594            }
595            if let Some(security_group_ids) = vpc_security_group_ids {
596                instance.vpc_security_group_ids = security_group_ids;
597            }
598        }
599
600        Ok(AwsResponse::xml(
601            StatusCode::OK,
602            xml_wrap(
603                "ModifyDBInstance",
604                &format!(
605                    "<DBInstance>{}</DBInstance>",
606                    db_instance_xml(instance, Some("modifying"))
607                ),
608                &request.request_id,
609            ),
610        ))
611    }
612
613    async fn reboot_db_instance(
614        &self,
615        request: &AwsRequest,
616    ) -> Result<AwsResponse, AwsServiceError> {
617        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
618        let force_failover =
619            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
620        if force_failover == Some(true) {
621            return Err(AwsServiceError::aws_error(
622                StatusCode::BAD_REQUEST,
623                "InvalidParameterCombination",
624                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
625            ));
626        }
627
628        let instance = {
629            let accounts = self.state.read();
630            let empty = RdsState::new(&request.account_id, &request.region);
631            let state = accounts.get(&request.account_id).unwrap_or(&empty);
632            state
633                .instances
634                .get(&db_instance_identifier)
635                .cloned()
636                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
637        };
638
639        let runtime = self.require_runtime()?;
640
641        let running = runtime
642            .restart_container(
643                &db_instance_identifier,
644                &instance.engine,
645                &instance.master_username,
646                &instance.master_user_password,
647                instance
648                    .db_name
649                    .as_deref()
650                    .unwrap_or(default_db_name(&instance.engine)),
651            )
652            .await
653            .map_err(runtime_error_to_service_error)?;
654
655        let instance = {
656            let mut accounts = self.state.write();
657            let state = accounts.get_or_create(&request.account_id);
658            let instance = state
659                .instances
660                .get_mut(&db_instance_identifier)
661                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
662            instance.host_port = running.host_port;
663            instance.port = i32::from(running.host_port);
664
665            // Apply any pending modifications
666            if let Some(pending) = instance.pending_modified_values.take() {
667                if let Some(class) = pending.db_instance_class {
668                    instance.db_instance_class = class;
669                }
670                if let Some(allocated_storage) = pending.allocated_storage {
671                    instance.allocated_storage = allocated_storage;
672                }
673                if let Some(backup_retention_period) = pending.backup_retention_period {
674                    instance.backup_retention_period = backup_retention_period;
675                }
676                if let Some(multi_az) = pending.multi_az {
677                    instance.multi_az = multi_az;
678                }
679                if let Some(engine_version) = pending.engine_version {
680                    instance.engine_version = engine_version;
681                }
682                if let Some(master_user_password) = pending.master_user_password {
683                    instance.master_user_password = master_user_password;
684                }
685            }
686
687            instance.clone()
688        };
689
690        Ok(AwsResponse::xml(
691            StatusCode::OK,
692            xml_wrap(
693                "RebootDBInstance",
694                &format!(
695                    "<DBInstance>{}</DBInstance>",
696                    db_instance_xml(&instance, Some("rebooting"))
697                ),
698                &request.request_id,
699            ),
700        ))
701    }
702
703    fn describe_db_engine_versions(
704        &self,
705        request: &AwsRequest,
706    ) -> Result<AwsResponse, AwsServiceError> {
707        let engine = optional_param(request, "Engine");
708        let engine_version = optional_param(request, "EngineVersion");
709        let family = optional_param(request, "DBParameterGroupFamily");
710        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
711
712        let mut versions = filter_engine_versions(
713            &default_engine_versions(),
714            &engine,
715            &engine_version,
716            &family,
717        );
718
719        if default_only.unwrap_or(false) {
720            versions.truncate(1);
721        }
722
723        Ok(AwsResponse::xml(
724            StatusCode::OK,
725            xml_wrap(
726                "DescribeDBEngineVersions",
727                &format!(
728                    "<DBEngineVersions>{}</DBEngineVersions>",
729                    versions.iter().map(engine_version_xml).collect::<String>()
730                ),
731                &request.request_id,
732            ),
733        ))
734    }
735
736    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
737        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
738        let marker = optional_param(request, "Marker");
739        let max_records = optional_param(request, "MaxRecords");
740
741        let accounts = self.state.read();
742        let empty = RdsState::new(&request.account_id, &request.region);
743        let state = accounts.get(&request.account_id).unwrap_or(&empty);
744
745        // If specific identifier requested, return just that one (no pagination)
746        if let Some(identifier) = db_instance_identifier {
747            let instance = state
748                .instances
749                .get(&identifier)
750                .cloned()
751                .ok_or_else(|| db_instance_not_found(&identifier))?;
752
753            return Ok(AwsResponse::xml(
754                StatusCode::OK,
755                xml_wrap(
756                    "DescribeDBInstances",
757                    &format!(
758                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
759                        db_instance_xml(&instance, None)
760                    ),
761                    &request.request_id,
762                ),
763            ));
764        }
765
766        // Get all instances sorted by created_at, then identifier
767        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
768        instances.sort_by(|a, b| {
769            a.created_at
770                .cmp(&b.created_at)
771                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
772        });
773
774        // Apply pagination
775        let paginated = paginate(instances, marker, max_records, |inst| {
776            &inst.db_instance_identifier
777        })?;
778
779        let marker_xml = paginated
780            .next_marker
781            .as_ref()
782            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
783            .unwrap_or_default();
784
785        Ok(AwsResponse::xml(
786            StatusCode::OK,
787            xml_wrap(
788                "DescribeDBInstances",
789                &format!(
790                    "<DBInstances>{}</DBInstances>{}",
791                    paginated
792                        .items
793                        .iter()
794                        .map(|instance| {
795                            format!(
796                                "<DBInstance>{}</DBInstance>",
797                                db_instance_xml(instance, None)
798                            )
799                        })
800                        .collect::<String>(),
801                    marker_xml
802                ),
803                &request.request_id,
804            ),
805        ))
806    }
807
808    fn describe_orderable_db_instance_options(
809        &self,
810        request: &AwsRequest,
811    ) -> Result<AwsResponse, AwsServiceError> {
812        let engine = optional_param(request, "Engine");
813        let engine_version = optional_param(request, "EngineVersion");
814        let db_instance_class = optional_param(request, "DBInstanceClass");
815        let license_model = optional_param(request, "LicenseModel");
816        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
817
818        let options = filter_orderable_options(
819            &default_orderable_options(),
820            &engine,
821            &engine_version,
822            &db_instance_class,
823            &license_model,
824            vpc,
825        );
826
827        Ok(AwsResponse::xml(
828            StatusCode::OK,
829            xml_wrap(
830                "DescribeOrderableDBInstanceOptions",
831                &format!(
832                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
833                    options.iter().map(orderable_option_xml).collect::<String>()
834                ),
835                &request.request_id,
836            ),
837        ))
838    }
839
840    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
841        let resource_name = required_param(request, "ResourceName")?;
842        let tags = parse_tags(request)?;
843
844        if tags.is_empty() {
845            return Err(AwsServiceError::aws_error(
846                StatusCode::BAD_REQUEST,
847                "MissingParameter",
848                "The request must contain the parameter Tags.",
849            ));
850        }
851
852        let mut accounts = self.state.write();
853        let state = accounts.get_or_create(&request.account_id);
854        let instance = find_instance_by_arn_mut(state, &resource_name)?;
855        merge_tags(&mut instance.tags, &tags);
856
857        Ok(AwsResponse::xml(
858            StatusCode::OK,
859            xml_wrap("AddTagsToResource", "", &request.request_id),
860        ))
861    }
862
863    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
864        let resource_name = required_param(request, "ResourceName")?;
865        if query_param_prefix_exists(request, "Filters.") {
866            return Err(AwsServiceError::aws_error(
867                StatusCode::BAD_REQUEST,
868                "InvalidParameterValue",
869                "Filters are not yet supported for ListTagsForResource.",
870            ));
871        }
872
873        let accounts = self.state.read();
874        let empty = RdsState::new(&request.account_id, &request.region);
875        let state = accounts.get(&request.account_id).unwrap_or(&empty);
876        let instance = find_instance_by_arn(state, &resource_name)?;
877        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
878
879        Ok(AwsResponse::xml(
880            StatusCode::OK,
881            xml_wrap(
882                "ListTagsForResource",
883                &format!("<TagList>{tag_xml}</TagList>"),
884                &request.request_id,
885            ),
886        ))
887    }
888
889    fn remove_tags_from_resource(
890        &self,
891        request: &AwsRequest,
892    ) -> Result<AwsResponse, AwsServiceError> {
893        let resource_name = required_param(request, "ResourceName")?;
894        let tag_keys = parse_tag_keys(request)?;
895
896        if tag_keys.is_empty() {
897            return Err(AwsServiceError::aws_error(
898                StatusCode::BAD_REQUEST,
899                "MissingParameter",
900                "The request must contain the parameter TagKeys.",
901            ));
902        }
903
904        let mut accounts = self.state.write();
905        let state = accounts.get_or_create(&request.account_id);
906        let instance = find_instance_by_arn_mut(state, &resource_name)?;
907        instance
908            .tags
909            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
910
911        Ok(AwsResponse::xml(
912            StatusCode::OK,
913            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
914        ))
915    }
916
917    async fn create_db_snapshot(
918        &self,
919        request: &AwsRequest,
920    ) -> Result<AwsResponse, AwsServiceError> {
921        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
922        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
923
924        let runtime = self.runtime.as_ref().ok_or_else(|| {
925            AwsServiceError::aws_error(
926                StatusCode::SERVICE_UNAVAILABLE,
927                "InvalidParameterValue",
928                "Docker/Podman is required for RDS snapshots but is not available",
929            )
930        })?;
931
932        let (instance, db_name) = {
933            let accounts = self.state.read();
934            let empty = RdsState::new(&request.account_id, &request.region);
935            let state = accounts.get(&request.account_id).unwrap_or(&empty);
936
937            if state.snapshots.contains_key(&db_snapshot_identifier) {
938                return Err(AwsServiceError::aws_error(
939                    StatusCode::CONFLICT,
940                    "DBSnapshotAlreadyExists",
941                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
942                ));
943            }
944
945            let instance = state
946                .instances
947                .get(&db_instance_identifier)
948                .cloned()
949                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
950
951            let default_db = default_db_name(&instance.engine);
952            let db_name = instance
953                .db_name
954                .as_deref()
955                .unwrap_or(default_db)
956                .to_string();
957
958            (instance, db_name)
959        };
960
961        let dump_data = runtime
962            .dump_database(
963                &db_instance_identifier,
964                &instance.engine,
965                &instance.master_username,
966                &instance.master_user_password,
967                &db_name,
968            )
969            .await
970            .map_err(runtime_error_to_service_error)?;
971
972        let mut accounts = self.state.write();
973        let state = accounts.get_or_create(&request.account_id);
974
975        if state.snapshots.contains_key(&db_snapshot_identifier) {
976            return Err(AwsServiceError::aws_error(
977                StatusCode::CONFLICT,
978                "DBSnapshotAlreadyExists",
979                format!("DBSnapshot {db_snapshot_identifier} already exists."),
980            ));
981        }
982
983        let snapshot = DbSnapshot {
984            db_snapshot_identifier: db_snapshot_identifier.clone(),
985            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
986            db_instance_identifier: instance.db_instance_identifier.clone(),
987            snapshot_create_time: Utc::now(),
988            engine: instance.engine.clone(),
989            engine_version: instance.engine_version.clone(),
990            allocated_storage: instance.allocated_storage,
991            status: "available".to_string(),
992            port: instance.port,
993            master_username: instance.master_username.clone(),
994            db_name: instance.db_name.clone(),
995            dbi_resource_id: instance.dbi_resource_id.clone(),
996            snapshot_type: "manual".to_string(),
997            master_user_password: instance.master_user_password.clone(),
998            tags: Vec::new(),
999            dump_data,
1000        };
1001
1002        state
1003            .snapshots
1004            .insert(db_snapshot_identifier, snapshot.clone());
1005
1006        Ok(AwsResponse::xml(
1007            StatusCode::OK,
1008            xml_wrap(
1009                "CreateDBSnapshot",
1010                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1011                &request.request_id,
1012            ),
1013        ))
1014    }
1015
1016    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1017        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1018        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1019        let marker = optional_param(request, "Marker");
1020        let max_records = optional_param(request, "MaxRecords");
1021
1022        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1023            return Err(AwsServiceError::aws_error(
1024                StatusCode::BAD_REQUEST,
1025                "InvalidParameterCombination",
1026                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1027            ));
1028        }
1029
1030        let accounts = self.state.read();
1031        let empty = RdsState::new(&request.account_id, &request.region);
1032        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1033
1034        // If specific snapshot requested, return just that one (no pagination)
1035        if let Some(snapshot_id) = db_snapshot_identifier {
1036            let snapshot = state
1037                .snapshots
1038                .get(&snapshot_id)
1039                .cloned()
1040                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1041
1042            return Ok(AwsResponse::xml(
1043                StatusCode::OK,
1044                xml_wrap(
1045                    "DescribeDBSnapshots",
1046                    &format!(
1047                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1048                        db_snapshot_xml(&snapshot)
1049                    ),
1050                    &request.request_id,
1051                ),
1052            ));
1053        }
1054
1055        // Get snapshots, filtered by instance identifier if provided
1056        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1057            state
1058                .snapshots
1059                .values()
1060                .filter(|s| s.db_instance_identifier == instance_id)
1061                .cloned()
1062                .collect()
1063        } else {
1064            state.snapshots.values().cloned().collect()
1065        };
1066
1067        // Sort by creation time, then identifier
1068        snapshots.sort_by(|a, b| {
1069            a.snapshot_create_time
1070                .cmp(&b.snapshot_create_time)
1071                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1072        });
1073
1074        // Apply pagination
1075        let paginated = paginate(snapshots, marker, max_records, |snap| {
1076            &snap.db_snapshot_identifier
1077        })?;
1078
1079        let marker_xml = paginated
1080            .next_marker
1081            .as_ref()
1082            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1083            .unwrap_or_default();
1084
1085        Ok(AwsResponse::xml(
1086            StatusCode::OK,
1087            xml_wrap(
1088                "DescribeDBSnapshots",
1089                &format!(
1090                    "<DBSnapshots>{}</DBSnapshots>{}",
1091                    paginated
1092                        .items
1093                        .iter()
1094                        .map(|snapshot| format!(
1095                            "<DBSnapshot>{}</DBSnapshot>",
1096                            db_snapshot_xml(snapshot)
1097                        ))
1098                        .collect::<String>(),
1099                    marker_xml
1100                ),
1101                &request.request_id,
1102            ),
1103        ))
1104    }
1105
1106    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1108
1109        let mut accounts = self.state.write();
1110        let state = accounts.get_or_create(&request.account_id);
1111
1112        let snapshot = state
1113            .snapshots
1114            .remove(&db_snapshot_identifier)
1115            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1116
1117        Ok(AwsResponse::xml(
1118            StatusCode::OK,
1119            xml_wrap(
1120                "DeleteDBSnapshot",
1121                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1122                &request.request_id,
1123            ),
1124        ))
1125    }
1126
1127    async fn restore_db_instance_from_db_snapshot(
1128        &self,
1129        request: &AwsRequest,
1130    ) -> Result<AwsResponse, AwsServiceError> {
1131        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1132        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1133        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1134
1135        let runtime = self.require_runtime()?;
1136
1137        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1138            let mut accounts = self.state.write();
1139            let state = accounts.get_or_create(&request.account_id);
1140
1141            if !state.begin_instance_creation(&db_instance_identifier) {
1142                return Err(AwsServiceError::aws_error(
1143                    StatusCode::CONFLICT,
1144                    "DBInstanceAlreadyExists",
1145                    format!("DBInstance {db_instance_identifier} already exists."),
1146                ));
1147            }
1148
1149            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1150                Some(s) => s,
1151                None => {
1152                    state.cancel_instance_creation(&db_instance_identifier);
1153                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1154                }
1155            };
1156
1157            let dbi_resource_id = state.next_dbi_resource_id();
1158            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1159            let created_at = Utc::now();
1160
1161            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1162        };
1163
1164        let db_name = snapshot
1165            .db_name
1166            .as_deref()
1167            .unwrap_or(default_db_name(&snapshot.engine));
1168        let running = match runtime
1169            .ensure_postgres(
1170                &db_instance_identifier,
1171                &snapshot.engine,
1172                &snapshot.engine_version,
1173                &snapshot.master_username,
1174                &snapshot.master_user_password,
1175                db_name,
1176            )
1177            .await
1178        {
1179            Ok(running) => running,
1180            Err(e) => {
1181                self.state
1182                    .write()
1183                    .get_or_create(&request.account_id)
1184                    .cancel_instance_creation(&db_instance_identifier);
1185                return Err(runtime_error_to_service_error(e));
1186            }
1187        };
1188
1189        if let Err(e) = runtime
1190            .restore_database(
1191                &db_instance_identifier,
1192                &snapshot.engine,
1193                &snapshot.master_username,
1194                &snapshot.master_user_password,
1195                db_name,
1196                &snapshot.dump_data,
1197            )
1198            .await
1199        {
1200            self.state
1201                .write()
1202                .get_or_create(&request.account_id)
1203                .cancel_instance_creation(&db_instance_identifier);
1204            runtime.stop_container(&db_instance_identifier).await;
1205            return Err(runtime_error_to_service_error(e));
1206        }
1207
1208        let instance = build_restored_instance(
1209            &db_instance_identifier,
1210            db_instance_arn,
1211            dbi_resource_id,
1212            created_at,
1213            vpc_security_group_ids,
1214            &snapshot,
1215            &running,
1216        );
1217
1218        self.state
1219            .write()
1220            .get_or_create(&request.account_id)
1221            .finish_instance_creation(instance.clone());
1222
1223        Ok(AwsResponse::xml(
1224            StatusCode::OK,
1225            xml_wrap(
1226                "RestoreDBInstanceFromDBSnapshot",
1227                &format!(
1228                    "<DBInstance>{}</DBInstance>",
1229                    db_instance_xml(&instance, None)
1230                ),
1231                &request.request_id,
1232            ),
1233        ))
1234    }
1235
1236    async fn create_db_instance_read_replica(
1237        &self,
1238        request: &AwsRequest,
1239    ) -> Result<AwsResponse, AwsServiceError> {
1240        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1241        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1242
1243        let runtime = self.runtime.as_ref().ok_or_else(|| {
1244            AwsServiceError::aws_error(
1245                StatusCode::SERVICE_UNAVAILABLE,
1246                "InvalidParameterValue",
1247                "Docker/Podman is required for RDS read replicas but is not available",
1248            )
1249        })?;
1250
1251        let (source_instance, db_name) = {
1252            let mut accounts = self.state.write();
1253            let state = accounts.get_or_create(&request.account_id);
1254
1255            if !state.begin_instance_creation(&db_instance_identifier) {
1256                return Err(AwsServiceError::aws_error(
1257                    StatusCode::CONFLICT,
1258                    "DBInstanceAlreadyExists",
1259                    format!("DBInstance {db_instance_identifier} already exists."),
1260                ));
1261            }
1262
1263            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1264            {
1265                Some(inst) => inst,
1266                None => {
1267                    state.cancel_instance_creation(&db_instance_identifier);
1268                    return Err(db_instance_not_found(&source_db_instance_identifier));
1269                }
1270            };
1271
1272            let default_db = default_db_name(&source_instance.engine);
1273            let db_name = source_instance
1274                .db_name
1275                .as_deref()
1276                .unwrap_or(default_db)
1277                .to_string();
1278
1279            (source_instance, db_name)
1280        };
1281
1282        let dump_data = match runtime
1283            .dump_database(
1284                &source_db_instance_identifier,
1285                &source_instance.engine,
1286                &source_instance.master_username,
1287                &source_instance.master_user_password,
1288                &db_name,
1289            )
1290            .await
1291        {
1292            Ok(data) => data,
1293            Err(e) => {
1294                self.state
1295                    .write()
1296                    .get_or_create(&request.account_id)
1297                    .cancel_instance_creation(&db_instance_identifier);
1298                return Err(runtime_error_to_service_error(e));
1299            }
1300        };
1301
1302        let (dbi_resource_id, db_instance_arn) = {
1303            let accounts = self.state.read();
1304            let empty = RdsState::new(&request.account_id, &request.region);
1305            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1306            (
1307                s.next_dbi_resource_id(),
1308                s.db_instance_arn(&db_instance_identifier),
1309            )
1310        };
1311        let created_at = Utc::now();
1312
1313        let running = match runtime
1314            .ensure_postgres(
1315                &db_instance_identifier,
1316                &source_instance.engine,
1317                &source_instance.engine_version,
1318                &source_instance.master_username,
1319                &source_instance.master_user_password,
1320                &db_name,
1321            )
1322            .await
1323        {
1324            Ok(running) => running,
1325            Err(e) => {
1326                self.state
1327                    .write()
1328                    .get_or_create(&request.account_id)
1329                    .cancel_instance_creation(&db_instance_identifier);
1330                return Err(runtime_error_to_service_error(e));
1331            }
1332        };
1333
1334        if let Err(e) = runtime
1335            .restore_database(
1336                &db_instance_identifier,
1337                &source_instance.engine,
1338                &source_instance.master_username,
1339                &source_instance.master_user_password,
1340                &db_name,
1341                &dump_data,
1342            )
1343            .await
1344        {
1345            self.state
1346                .write()
1347                .get_or_create(&request.account_id)
1348                .cancel_instance_creation(&db_instance_identifier);
1349            runtime.stop_container(&db_instance_identifier).await;
1350            return Err(runtime_error_to_service_error(e));
1351        }
1352
1353        let replica = build_read_replica_instance(
1354            &db_instance_identifier,
1355            db_instance_arn,
1356            dbi_resource_id,
1357            created_at,
1358            &source_db_instance_identifier,
1359            &source_instance,
1360            &running,
1361        );
1362
1363        let source_missing = {
1364            let mut accounts = self.state.write();
1365            let state = accounts.get_or_create(&request.account_id);
1366            match state.instances.get_mut(&source_db_instance_identifier) {
1367                Some(source) => {
1368                    source
1369                        .read_replica_db_instance_identifiers
1370                        .push(db_instance_identifier.clone());
1371                    state.finish_instance_creation(replica.clone());
1372                    false
1373                }
1374                None => {
1375                    state.cancel_instance_creation(&db_instance_identifier);
1376                    true
1377                }
1378            }
1379        };
1380
1381        if source_missing {
1382            runtime.stop_container(&db_instance_identifier).await;
1383            return Err(db_instance_not_found(&source_db_instance_identifier));
1384        }
1385
1386        Ok(AwsResponse::xml(
1387            StatusCode::OK,
1388            xml_wrap(
1389                "CreateDBInstanceReadReplica",
1390                &format!(
1391                    "<DBInstance>{}</DBInstance>",
1392                    db_instance_xml(&replica, None)
1393                ),
1394                &request.request_id,
1395            ),
1396        ))
1397    }
1398
1399    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1400        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1401        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1402        let subnet_ids = parse_subnet_ids(request)?;
1403
1404        if subnet_ids.is_empty() {
1405            return Err(AwsServiceError::aws_error(
1406                StatusCode::BAD_REQUEST,
1407                "InvalidParameterValue",
1408                "At least one subnet must be specified.",
1409            ));
1410        }
1411
1412        if subnet_ids.len() < 2 {
1413            return Err(AwsServiceError::aws_error(
1414                StatusCode::BAD_REQUEST,
1415                "DBSubnetGroupDoesNotCoverEnoughAZs",
1416                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1417            ));
1418        }
1419
1420        let mut accounts = self.state.write();
1421        let state = accounts.get_or_create(&request.account_id);
1422
1423        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1424            return Err(AwsServiceError::aws_error(
1425                StatusCode::CONFLICT,
1426                "DBSubnetGroupAlreadyExists",
1427                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1428            ));
1429        }
1430
1431        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1432        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1433            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1434            .collect();
1435
1436        // Validate that subnets span at least 2 unique Availability Zones
1437        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1438        if unique_azs.len() < 2 {
1439            return Err(AwsServiceError::aws_error(
1440                StatusCode::BAD_REQUEST,
1441                "DBSubnetGroupDoesNotCoverEnoughAZs",
1442                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1443            ));
1444        }
1445
1446        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1447        let tags = parse_tags(request)?;
1448
1449        let subnet_group = DbSubnetGroup {
1450            db_subnet_group_name: db_subnet_group_name.clone(),
1451            db_subnet_group_arn,
1452            db_subnet_group_description,
1453            vpc_id,
1454            subnet_ids,
1455            subnet_availability_zones,
1456            tags,
1457        };
1458
1459        state
1460            .subnet_groups
1461            .insert(db_subnet_group_name, subnet_group.clone());
1462
1463        Ok(AwsResponse::xml(
1464            StatusCode::OK,
1465            xml_wrap(
1466                "CreateDBSubnetGroup",
1467                &format!(
1468                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1469                    db_subnet_group_xml(&subnet_group)
1470                ),
1471                &request.request_id,
1472            ),
1473        ))
1474    }
1475
1476    fn describe_db_subnet_groups(
1477        &self,
1478        request: &AwsRequest,
1479    ) -> Result<AwsResponse, AwsServiceError> {
1480        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1481        let marker = optional_param(request, "Marker");
1482        let max_records = optional_param(request, "MaxRecords");
1483
1484        let accounts = self.state.read();
1485        let empty = RdsState::new(&request.account_id, &request.region);
1486        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1487
1488        // If specific subnet group requested, return just that one (no pagination)
1489        if let Some(name) = db_subnet_group_name {
1490            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1491                AwsServiceError::aws_error(
1492                    StatusCode::NOT_FOUND,
1493                    "DBSubnetGroupNotFoundFault",
1494                    format!("DBSubnetGroup {} not found.", name),
1495                )
1496            })?;
1497
1498            return Ok(AwsResponse::xml(
1499                StatusCode::OK,
1500                xml_wrap(
1501                    "DescribeDBSubnetGroups",
1502                    &format!(
1503                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1504                        db_subnet_group_xml(sg)
1505                    ),
1506                    &request.request_id,
1507                ),
1508            ));
1509        }
1510
1511        // Get all subnet groups sorted by name
1512        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1513        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1514
1515        // Apply pagination
1516        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1517            &sg.db_subnet_group_name
1518        })?;
1519
1520        let marker_xml = paginated
1521            .next_marker
1522            .as_ref()
1523            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1524            .unwrap_or_default();
1525
1526        let body = paginated
1527            .items
1528            .iter()
1529            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1530            .collect::<Vec<_>>()
1531            .join("");
1532
1533        Ok(AwsResponse::xml(
1534            StatusCode::OK,
1535            xml_wrap(
1536                "DescribeDBSubnetGroups",
1537                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1538                &request.request_id,
1539            ),
1540        ))
1541    }
1542
1543    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1544        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1545
1546        let mut accounts = self.state.write();
1547        let state = accounts.get_or_create(&request.account_id);
1548
1549        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1550            return Err(AwsServiceError::aws_error(
1551                StatusCode::NOT_FOUND,
1552                "DBSubnetGroupNotFoundFault",
1553                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1554            ));
1555        }
1556
1557        Ok(AwsResponse::xml(
1558            StatusCode::OK,
1559            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1560        ))
1561    }
1562
1563    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1564        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1565        let subnet_ids = parse_subnet_ids(request)?;
1566
1567        if subnet_ids.is_empty() {
1568            return Err(AwsServiceError::aws_error(
1569                StatusCode::BAD_REQUEST,
1570                "InvalidParameterValue",
1571                "At least one subnet must be specified.",
1572            ));
1573        }
1574
1575        if subnet_ids.len() < 2 {
1576            return Err(AwsServiceError::aws_error(
1577                StatusCode::BAD_REQUEST,
1578                "DBSubnetGroupDoesNotCoverEnoughAZs",
1579                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1580            ));
1581        }
1582
1583        let mut accounts = self.state.write();
1584        let state = accounts.get_or_create(&request.account_id);
1585
1586        let region = state.region.clone();
1587
1588        let subnet_group = state
1589            .subnet_groups
1590            .get_mut(&db_subnet_group_name)
1591            .ok_or_else(|| {
1592                AwsServiceError::aws_error(
1593                    StatusCode::NOT_FOUND,
1594                    "DBSubnetGroupNotFoundFault",
1595                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1596                )
1597            })?;
1598
1599        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1600            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1601            .collect();
1602
1603        // Validate that subnets span at least 2 unique Availability Zones
1604        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1605        if unique_azs.len() < 2 {
1606            return Err(AwsServiceError::aws_error(
1607                StatusCode::BAD_REQUEST,
1608                "DBSubnetGroupDoesNotCoverEnoughAZs",
1609                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1610            ));
1611        }
1612
1613        subnet_group.subnet_ids = subnet_ids;
1614        subnet_group.subnet_availability_zones = subnet_availability_zones;
1615
1616        let subnet_group_clone = subnet_group.clone();
1617
1618        Ok(AwsResponse::xml(
1619            StatusCode::OK,
1620            xml_wrap(
1621                "ModifyDBSubnetGroup",
1622                &format!(
1623                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1624                    db_subnet_group_xml(&subnet_group_clone)
1625                ),
1626                &request.request_id,
1627            ),
1628        ))
1629    }
1630
1631    fn create_db_parameter_group(
1632        &self,
1633        request: &AwsRequest,
1634    ) -> Result<AwsResponse, AwsServiceError> {
1635        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1636        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1637        let description = required_param(request, "Description")?;
1638
1639        // Validate parameter group family against supported engines and versions
1640        let valid_families = [
1641            "postgres16",
1642            "postgres15",
1643            "postgres14",
1644            "postgres13",
1645            "mysql8.0",
1646            "mysql5.7",
1647            "mariadb10.11",
1648            "mariadb10.6",
1649        ];
1650
1651        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1652            return Err(AwsServiceError::aws_error(
1653                StatusCode::BAD_REQUEST,
1654                "InvalidParameterValue",
1655                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1656            ));
1657        }
1658
1659        let mut accounts = self.state.write();
1660        let state = accounts.get_or_create(&request.account_id);
1661
1662        if state
1663            .parameter_groups
1664            .contains_key(&db_parameter_group_name)
1665        {
1666            return Err(AwsServiceError::aws_error(
1667                StatusCode::CONFLICT,
1668                "DBParameterGroupAlreadyExists",
1669                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1670            ));
1671        }
1672
1673        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1674        let tags = parse_tags(request)?;
1675
1676        let parameter_group = DbParameterGroup {
1677            db_parameter_group_name: db_parameter_group_name.clone(),
1678            db_parameter_group_arn,
1679            db_parameter_group_family,
1680            description,
1681            parameters: std::collections::HashMap::new(),
1682            tags,
1683        };
1684
1685        state
1686            .parameter_groups
1687            .insert(db_parameter_group_name, parameter_group.clone());
1688
1689        Ok(AwsResponse::xml(
1690            StatusCode::OK,
1691            xml_wrap(
1692                "CreateDBParameterGroup",
1693                &format!(
1694                    "<DBParameterGroup>{}</DBParameterGroup>",
1695                    db_parameter_group_xml(&parameter_group)
1696                ),
1697                &request.request_id,
1698            ),
1699        ))
1700    }
1701
1702    fn describe_db_parameter_groups(
1703        &self,
1704        request: &AwsRequest,
1705    ) -> Result<AwsResponse, AwsServiceError> {
1706        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1707        let marker = optional_param(request, "Marker");
1708        let max_records = optional_param(request, "MaxRecords");
1709
1710        let accounts = self.state.read();
1711        let empty = RdsState::new(&request.account_id, &request.region);
1712        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1713
1714        // If specific parameter group requested, return just that one (no pagination)
1715        if let Some(name) = db_parameter_group_name {
1716            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1717                AwsServiceError::aws_error(
1718                    StatusCode::NOT_FOUND,
1719                    "DBParameterGroupNotFound",
1720                    format!("DBParameterGroup {} not found.", name),
1721                )
1722            })?;
1723
1724            return Ok(AwsResponse::xml(
1725                StatusCode::OK,
1726                xml_wrap(
1727                    "DescribeDBParameterGroups",
1728                    &format!(
1729                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1730                        db_parameter_group_xml(pg)
1731                    ),
1732                    &request.request_id,
1733                ),
1734            ));
1735        }
1736
1737        // Get all parameter groups sorted by name
1738        let mut parameter_groups: Vec<DbParameterGroup> =
1739            state.parameter_groups.values().cloned().collect();
1740        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1741
1742        // Apply pagination
1743        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1744            &pg.db_parameter_group_name
1745        })?;
1746
1747        let marker_xml = paginated
1748            .next_marker
1749            .as_ref()
1750            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1751            .unwrap_or_default();
1752
1753        let body = paginated
1754            .items
1755            .iter()
1756            .map(|pg| {
1757                format!(
1758                    "<DBParameterGroup>{}</DBParameterGroup>",
1759                    db_parameter_group_xml(pg)
1760                )
1761            })
1762            .collect::<Vec<_>>()
1763            .join("");
1764
1765        Ok(AwsResponse::xml(
1766            StatusCode::OK,
1767            xml_wrap(
1768                "DescribeDBParameterGroups",
1769                &format!(
1770                    "<DBParameterGroups>{}</DBParameterGroups>{}",
1771                    body, marker_xml
1772                ),
1773                &request.request_id,
1774            ),
1775        ))
1776    }
1777
1778    fn delete_db_parameter_group(
1779        &self,
1780        request: &AwsRequest,
1781    ) -> Result<AwsResponse, AwsServiceError> {
1782        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1783
1784        let mut accounts = self.state.write();
1785        let state = accounts.get_or_create(&request.account_id);
1786
1787        if db_parameter_group_name.starts_with("default.") {
1788            return Err(AwsServiceError::aws_error(
1789                StatusCode::BAD_REQUEST,
1790                "InvalidParameterValue",
1791                "Cannot delete default parameter groups.",
1792            ));
1793        }
1794
1795        if state
1796            .parameter_groups
1797            .remove(&db_parameter_group_name)
1798            .is_none()
1799        {
1800            return Err(AwsServiceError::aws_error(
1801                StatusCode::NOT_FOUND,
1802                "DBParameterGroupNotFound",
1803                format!("DBParameterGroup {db_parameter_group_name} not found."),
1804            ));
1805        }
1806
1807        Ok(AwsResponse::xml(
1808            StatusCode::OK,
1809            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1810        ))
1811    }
1812
1813    fn modify_db_parameter_group(
1814        &self,
1815        request: &AwsRequest,
1816    ) -> Result<AwsResponse, AwsServiceError> {
1817        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1818
1819        let mut accounts = self.state.write();
1820        let state = accounts.get_or_create(&request.account_id);
1821
1822        let parameter_group = state
1823            .parameter_groups
1824            .get_mut(&db_parameter_group_name)
1825            .ok_or_else(|| {
1826                AwsServiceError::aws_error(
1827                    StatusCode::NOT_FOUND,
1828                    "DBParameterGroupNotFound",
1829                    format!("DBParameterGroup {db_parameter_group_name} not found."),
1830                )
1831            })?;
1832
1833        if let Some(new_description) = optional_param(request, "Description") {
1834            parameter_group.description = new_description;
1835        }
1836
1837        let parameter_group_clone = parameter_group.clone();
1838
1839        Ok(AwsResponse::xml(
1840            StatusCode::OK,
1841            xml_wrap(
1842                "ModifyDBParameterGroup",
1843                &format!(
1844                    "<DBParameterGroupName>{}</DBParameterGroupName>",
1845                    xml_escape(&parameter_group_clone.db_parameter_group_name)
1846                ),
1847                &request.request_id,
1848            ),
1849        ))
1850    }
1851}
1852
1853fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1854    fakecloud_core::query::optional_query_param(req, name)
1855}
1856
1857fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1858    fakecloud_core::query::required_query_param(req, name)
1859}
1860
1861fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1862    let value = required_param(req, name)?;
1863    value.parse::<i32>().map_err(|_| {
1864        AwsServiceError::aws_error(
1865            StatusCode::BAD_REQUEST,
1866            "InvalidParameterValue",
1867            format!("Parameter {name} must be a valid integer."),
1868        )
1869    })
1870}
1871
1872fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1873    optional_param(req, name)
1874        .map(|value| {
1875            value.parse::<i32>().map_err(|_| {
1876                AwsServiceError::aws_error(
1877                    StatusCode::BAD_REQUEST,
1878                    "InvalidParameterValue",
1879                    format!("Parameter {name} must be a valid integer."),
1880                )
1881            })
1882        })
1883        .transpose()
1884}
1885
1886fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1887    let mut tags = Vec::new();
1888    for index in 1.. {
1889        let key_name = format!("Tags.Tag.{index}.Key");
1890        let value_name = format!("Tags.Tag.{index}.Value");
1891        let key = optional_param(req, &key_name);
1892        let value = optional_param(req, &value_name);
1893
1894        match (key, value) {
1895            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1896            (None, None) => break,
1897            _ => {
1898                return Err(AwsServiceError::aws_error(
1899                    StatusCode::BAD_REQUEST,
1900                    "InvalidParameterValue",
1901                    "Each tag must include both Key and Value.",
1902                ));
1903            }
1904        }
1905    }
1906
1907    Ok(tags)
1908}
1909
1910fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1911    let mut keys = Vec::new();
1912    for index in 1.. {
1913        let key_name = format!("TagKeys.member.{index}");
1914        match optional_param(req, &key_name) {
1915            Some(key) => keys.push(key),
1916            None => break,
1917        }
1918    }
1919
1920    Ok(keys)
1921}
1922
1923fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1924    let mut subnet_ids = Vec::new();
1925    for index in 1.. {
1926        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1927        match optional_param(req, &subnet_id_name) {
1928            Some(subnet_id) => subnet_ids.push(subnet_id),
1929            None => break,
1930        }
1931    }
1932
1933    Ok(subnet_ids)
1934}
1935
1936fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1937    let mut security_group_ids = Vec::new();
1938    for index in 1.. {
1939        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1940        match optional_param(req, &sg_id_name) {
1941            Some(sg_id) => security_group_ids.push(sg_id),
1942            None => break,
1943        }
1944    }
1945
1946    // If no security groups provided, return a default one
1947    if security_group_ids.is_empty() {
1948        security_group_ids.push("sg-default".to_string());
1949    }
1950
1951    security_group_ids
1952}
1953
1954fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1955    req.query_params.keys().any(|key| key.starts_with(prefix))
1956}
1957
1958fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1959    value
1960        .map(|raw| match raw {
1961            "true" | "True" | "TRUE" => Ok(true),
1962            "false" | "False" | "FALSE" => Ok(false),
1963            _ => Err(AwsServiceError::aws_error(
1964                StatusCode::BAD_REQUEST,
1965                "InvalidParameterValue",
1966                format!("Boolean parameter value '{raw}' is invalid."),
1967            )),
1968        })
1969        .transpose()
1970}
1971
1972struct PaginationResult<T> {
1973    items: Vec<T>,
1974    next_marker: Option<String>,
1975}
1976
1977fn paginate<T, F>(
1978    mut items: Vec<T>,
1979    marker: Option<String>,
1980    max_records: Option<String>,
1981    get_id: F,
1982) -> Result<PaginationResult<T>, AwsServiceError>
1983where
1984    F: Fn(&T) -> &str,
1985{
1986    // Parse max_records with default 100, max 100
1987    let max = if let Some(max_str) = max_records {
1988        let parsed = max_str.parse::<i32>().map_err(|_| {
1989            AwsServiceError::aws_error(
1990                StatusCode::BAD_REQUEST,
1991                "InvalidParameterValue",
1992                "MaxRecords must be a valid integer.",
1993            )
1994        })?;
1995        if !(1..=100).contains(&parsed) {
1996            return Err(AwsServiceError::aws_error(
1997                StatusCode::BAD_REQUEST,
1998                "InvalidParameterValue",
1999                "MaxRecords must be between 1 and 100.",
2000            ));
2001        }
2002        parsed as usize
2003    } else {
2004        100
2005    };
2006
2007    // Decode marker to get starting identifier
2008    let start_id = if let Some(encoded_marker) = marker {
2009        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2010            AwsServiceError::aws_error(
2011                StatusCode::BAD_REQUEST,
2012                "InvalidParameterValue",
2013                "Marker is invalid.",
2014            )
2015        })?;
2016        let id = String::from_utf8(decoded).map_err(|_| {
2017            AwsServiceError::aws_error(
2018                StatusCode::BAD_REQUEST,
2019                "InvalidParameterValue",
2020                "Marker is invalid.",
2021            )
2022        })?;
2023        Some(id)
2024    } else {
2025        None
2026    };
2027
2028    // Find starting position
2029    let start_index = if let Some(ref start_id) = start_id {
2030        items
2031            .iter()
2032            .position(|item| get_id(item) == start_id)
2033            .map(|pos| pos + 1) // Start after the marker
2034            .unwrap_or(items.len()) // If not found, return empty result
2035    } else {
2036        0
2037    };
2038
2039    // Take items from start_index
2040    let total_items = items.len();
2041    let end_index = std::cmp::min(start_index + max, total_items);
2042    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2043
2044    // Create next marker if there are more items
2045    let next_marker = if end_index < total_items {
2046        paginated_items
2047            .last()
2048            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2049    } else {
2050        None
2051    };
2052
2053    Ok(PaginationResult {
2054        items: paginated_items,
2055        next_marker,
2056    })
2057}
2058
2059fn validate_create_request(
2060    db_instance_identifier: &str,
2061    allocated_storage: i32,
2062    db_instance_class: &str,
2063    engine: &str,
2064    engine_version: &str,
2065    port: i32,
2066) -> Result<(), AwsServiceError> {
2067    if allocated_storage <= 0 {
2068        return Err(AwsServiceError::aws_error(
2069            StatusCode::BAD_REQUEST,
2070            "InvalidParameterValue",
2071            "AllocatedStorage must be greater than zero.",
2072        ));
2073    }
2074    if port <= 0 {
2075        return Err(AwsServiceError::aws_error(
2076            StatusCode::BAD_REQUEST,
2077            "InvalidParameterValue",
2078            "Port must be greater than zero.",
2079        ));
2080    }
2081    if !db_instance_identifier
2082        .chars()
2083        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2084    {
2085        return Err(AwsServiceError::aws_error(
2086            StatusCode::BAD_REQUEST,
2087            "InvalidParameterValue",
2088            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2089        ));
2090    }
2091    // Validate engine
2092    let supported_engines = ["postgres", "mysql", "mariadb"];
2093    if !supported_engines.contains(&engine) {
2094        return Err(AwsServiceError::aws_error(
2095            StatusCode::BAD_REQUEST,
2096            "InvalidParameterValue",
2097            format!("Engine '{}' is not supported.", engine),
2098        ));
2099    }
2100
2101    // Validate engine version
2102    let supported_versions = match engine {
2103        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2104        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2105        "mariadb" => vec!["10.11.6", "10.6.16"],
2106        _ => vec![],
2107    };
2108
2109    if !supported_versions.contains(&engine_version) {
2110        return Err(AwsServiceError::aws_error(
2111            StatusCode::BAD_REQUEST,
2112            "InvalidParameterValue",
2113            format!("EngineVersion '{engine_version}' is not supported yet."),
2114        ));
2115    }
2116    validate_db_instance_class(db_instance_class)?;
2117    Ok(())
2118}
2119
2120fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2121    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2122        return Err(AwsServiceError::aws_error(
2123            StatusCode::BAD_REQUEST,
2124            "InvalidParameterValue",
2125            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2126        ));
2127    }
2128    Ok(())
2129}
2130
2131fn filter_engine_versions(
2132    versions: &[EngineVersionInfo],
2133    engine: &Option<String>,
2134    engine_version: &Option<String>,
2135    family: &Option<String>,
2136) -> Vec<EngineVersionInfo> {
2137    versions
2138        .iter()
2139        .filter(|candidate| {
2140            engine
2141                .as_ref()
2142                .is_none_or(|expected| candidate.engine == *expected)
2143        })
2144        .filter(|candidate| {
2145            engine_version
2146                .as_ref()
2147                .is_none_or(|expected| candidate.engine_version == *expected)
2148        })
2149        .filter(|candidate| {
2150            family
2151                .as_ref()
2152                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2153        })
2154        .cloned()
2155        .collect()
2156}
2157
2158fn filter_orderable_options(
2159    options: &[OrderableDbInstanceOption],
2160    engine: &Option<String>,
2161    engine_version: &Option<String>,
2162    db_instance_class: &Option<String>,
2163    license_model: &Option<String>,
2164    vpc: Option<bool>,
2165) -> Vec<OrderableDbInstanceOption> {
2166    options
2167        .iter()
2168        .filter(|candidate| {
2169            engine
2170                .as_ref()
2171                .is_none_or(|expected| candidate.engine == *expected)
2172        })
2173        .filter(|candidate| {
2174            engine_version
2175                .as_ref()
2176                .is_none_or(|expected| candidate.engine_version == *expected)
2177        })
2178        .filter(|candidate| {
2179            db_instance_class
2180                .as_ref()
2181                .is_none_or(|expected| candidate.db_instance_class == *expected)
2182        })
2183        .filter(|candidate| {
2184            license_model
2185                .as_ref()
2186                .is_none_or(|expected| candidate.license_model == *expected)
2187        })
2188        .filter(|_| vpc.unwrap_or(true))
2189        .cloned()
2190        .collect()
2191}
2192
2193/// Build a `DbInstance` for a newly-created read replica, copying the
2194/// source instance's physical attributes and binding the replica's
2195/// identifier, ARN, resource id, container id and host port.
2196#[allow(clippy::too_many_arguments)]
2197/// Build a `DbInstance` from a restored snapshot. Copies the physical
2198/// attributes off the snapshot and binds the new instance's identifier,
2199/// ARN, resource id, container id and host port.
2200fn build_restored_instance(
2201    db_instance_identifier: &str,
2202    db_instance_arn: String,
2203    dbi_resource_id: String,
2204    created_at: chrono::DateTime<Utc>,
2205    vpc_security_group_ids: Vec<String>,
2206    snapshot: &DbSnapshot,
2207    running: &crate::runtime::RunningDbContainer,
2208) -> DbInstance {
2209    DbInstance {
2210        db_instance_identifier: db_instance_identifier.to_string(),
2211        db_instance_arn,
2212        db_instance_class: "db.t3.micro".to_string(),
2213        engine: snapshot.engine.clone(),
2214        engine_version: snapshot.engine_version.clone(),
2215        db_instance_status: "available".to_string(),
2216        master_username: snapshot.master_username.clone(),
2217        db_name: snapshot.db_name.clone(),
2218        endpoint_address: "127.0.0.1".to_string(),
2219        port: i32::from(running.host_port),
2220        allocated_storage: snapshot.allocated_storage,
2221        publicly_accessible: true,
2222        deletion_protection: false,
2223        created_at,
2224        dbi_resource_id,
2225        master_user_password: snapshot.master_user_password.clone(),
2226        container_id: running.container_id.clone(),
2227        host_port: running.host_port,
2228        tags: Vec::new(),
2229        read_replica_source_db_instance_identifier: None,
2230        read_replica_db_instance_identifiers: Vec::new(),
2231        vpc_security_group_ids,
2232        db_parameter_group_name: None,
2233        backup_retention_period: 1,
2234        preferred_backup_window: "03:00-04:00".to_string(),
2235        latest_restorable_time: Some(created_at),
2236        option_group_name: None,
2237        multi_az: false,
2238        pending_modified_values: None,
2239    }
2240}
2241
2242fn build_read_replica_instance(
2243    db_instance_identifier: &str,
2244    db_instance_arn: String,
2245    dbi_resource_id: String,
2246    created_at: chrono::DateTime<Utc>,
2247    source_db_instance_identifier: &str,
2248    source: &DbInstance,
2249    running: &crate::runtime::RunningDbContainer,
2250) -> DbInstance {
2251    DbInstance {
2252        db_instance_identifier: db_instance_identifier.to_string(),
2253        db_instance_arn,
2254        db_instance_class: source.db_instance_class.clone(),
2255        engine: source.engine.clone(),
2256        engine_version: source.engine_version.clone(),
2257        db_instance_status: "available".to_string(),
2258        master_username: source.master_username.clone(),
2259        db_name: source.db_name.clone(),
2260        endpoint_address: "127.0.0.1".to_string(),
2261        port: i32::from(running.host_port),
2262        allocated_storage: source.allocated_storage,
2263        publicly_accessible: source.publicly_accessible,
2264        deletion_protection: false,
2265        created_at,
2266        dbi_resource_id,
2267        master_user_password: source.master_user_password.clone(),
2268        container_id: running.container_id.clone(),
2269        host_port: running.host_port,
2270        tags: Vec::new(),
2271        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2272        read_replica_db_instance_identifiers: Vec::new(),
2273        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2274        db_parameter_group_name: source.db_parameter_group_name.clone(),
2275        backup_retention_period: source.backup_retention_period,
2276        preferred_backup_window: source.preferred_backup_window.clone(),
2277        latest_restorable_time: if source.backup_retention_period > 0 {
2278            Some(created_at)
2279        } else {
2280            None
2281        },
2282        option_group_name: source.option_group_name.clone(),
2283        multi_az: source.multi_az,
2284        pending_modified_values: None,
2285    }
2286}
2287
2288fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2289    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2290}
2291
2292fn engine_version_xml(version: &EngineVersionInfo) -> String {
2293    format!(
2294        "<DBEngineVersion>\
2295         <Engine>{}</Engine>\
2296         <EngineVersion>{}</EngineVersion>\
2297         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2298         <DBEngineDescription>{}</DBEngineDescription>\
2299         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2300         <Status>{}</Status>\
2301         </DBEngineVersion>",
2302        xml_escape(&version.engine),
2303        xml_escape(&version.engine_version),
2304        xml_escape(&version.db_parameter_group_family),
2305        xml_escape(&version.db_engine_description),
2306        xml_escape(&version.db_engine_version_description),
2307        xml_escape(&version.status),
2308    )
2309}
2310
2311fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2312    format!(
2313        "<OrderableDBInstanceOption>\
2314         <Engine>{}</Engine>\
2315         <EngineVersion>{}</EngineVersion>\
2316         <DBInstanceClass>{}</DBInstanceClass>\
2317         <LicenseModel>{}</LicenseModel>\
2318         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2319         <MultiAZCapable>true</MultiAZCapable>\
2320         <ReadReplicaCapable>true</ReadReplicaCapable>\
2321         <Vpc>true</Vpc>\
2322         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2323         <StorageType>{}</StorageType>\
2324         <SupportsIops>false</SupportsIops>\
2325         <MinStorageSize>{}</MinStorageSize>\
2326         <MaxStorageSize>{}</MaxStorageSize>\
2327         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2328         </OrderableDBInstanceOption>",
2329        xml_escape(&option.engine),
2330        xml_escape(&option.engine_version),
2331        xml_escape(&option.db_instance_class),
2332        xml_escape(&option.license_model),
2333        xml_escape(&option.storage_type),
2334        option.min_storage_size,
2335        option.max_storage_size,
2336    )
2337}
2338
2339fn tag_xml(tag: &RdsTag) -> String {
2340    format!(
2341        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2342        xml_escape(&tag.key),
2343        xml_escape(&tag.value),
2344    )
2345}
2346
2347fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2348    let status = status_override.unwrap_or(&instance.db_instance_status);
2349    let db_name_xml = instance
2350        .db_name
2351        .as_ref()
2352        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2353        .unwrap_or_default();
2354
2355    let read_replica_source_xml = instance
2356        .read_replica_source_db_instance_identifier
2357        .as_ref()
2358        .map(|source| {
2359            format!(
2360                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2361                xml_escape(source)
2362            )
2363        })
2364        .unwrap_or_default();
2365
2366    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2367        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2368    } else {
2369        format!(
2370            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2371            instance
2372                .read_replica_db_instance_identifiers
2373                .iter()
2374                .map(|id| format!(
2375                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2376                    xml_escape(id)
2377                ))
2378                .collect::<String>()
2379        )
2380    };
2381
2382    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2383        "<VpcSecurityGroups/>".to_string()
2384    } else {
2385        format!(
2386            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2387            instance
2388                .vpc_security_group_ids
2389                .iter()
2390                .map(|sg_id| format!(
2391                    "<VpcSecurityGroupMembership>\
2392                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2393                     <Status>active</Status>\
2394                     </VpcSecurityGroupMembership>",
2395                    xml_escape(sg_id)
2396                ))
2397                .collect::<String>()
2398        )
2399    };
2400
2401    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2402        Some(pg_name) => format!(
2403            "<DBParameterGroups>\
2404             <DBParameterGroup>\
2405             <DBParameterGroupName>{}</DBParameterGroupName>\
2406             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2407             </DBParameterGroup>\
2408             </DBParameterGroups>",
2409            xml_escape(pg_name)
2410        ),
2411        None => "<DBParameterGroups/>".to_string(),
2412    };
2413
2414    let option_group_memberships_xml = match &instance.option_group_name {
2415        Some(og_name) => format!(
2416            "<OptionGroupMemberships>\
2417             <OptionGroupMembership>\
2418             <OptionGroupName>{}</OptionGroupName>\
2419             <Status>in-sync</Status>\
2420             </OptionGroupMembership>\
2421             </OptionGroupMemberships>",
2422            xml_escape(og_name)
2423        ),
2424        None => "<OptionGroupMemberships/>".to_string(),
2425    };
2426
2427    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2428        let mut fields = Vec::new();
2429        if let Some(ref class) = pending.db_instance_class {
2430            fields.push(format!(
2431                "<DBInstanceClass>{}</DBInstanceClass>",
2432                xml_escape(class)
2433            ));
2434        }
2435        if let Some(allocated_storage) = pending.allocated_storage {
2436            fields.push(format!(
2437                "<AllocatedStorage>{}</AllocatedStorage>",
2438                allocated_storage
2439            ));
2440        }
2441        if let Some(backup_retention_period) = pending.backup_retention_period {
2442            fields.push(format!(
2443                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2444                backup_retention_period
2445            ));
2446        }
2447        if let Some(multi_az) = pending.multi_az {
2448            fields.push(format!(
2449                "<MultiAZ>{}</MultiAZ>",
2450                if multi_az { "true" } else { "false" }
2451            ));
2452        }
2453        if let Some(ref engine_version) = pending.engine_version {
2454            fields.push(format!(
2455                "<EngineVersion>{}</EngineVersion>",
2456                xml_escape(engine_version)
2457            ));
2458        }
2459        if pending.master_user_password.is_some() {
2460            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2461        }
2462        if !fields.is_empty() {
2463            format!(
2464                "<PendingModifiedValues>{}</PendingModifiedValues>",
2465                fields.join("")
2466            )
2467        } else {
2468            String::new()
2469        }
2470    } else {
2471        String::new()
2472    };
2473
2474    let latest_restorable_time_xml = instance
2475        .latest_restorable_time
2476        .map(|t| {
2477            format!(
2478                "<LatestRestorableTime>{}</LatestRestorableTime>",
2479                t.to_rfc3339()
2480            )
2481        })
2482        .unwrap_or_default();
2483
2484    format!(
2485        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2486         <DBInstanceClass>{class}</DBInstanceClass>\
2487         <Engine>{engine}</Engine>\
2488         <DBInstanceStatus>{status}</DBInstanceStatus>\
2489         <MasterUsername>{master_username}</MasterUsername>\
2490         {db_name_xml}\
2491         <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2492         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2493         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2494         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2495         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2496         <DBSecurityGroups/>\
2497         {vpc_security_groups_xml}\
2498         {db_parameter_groups_xml}\
2499         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2500         {latest_restorable_time_xml}\
2501         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2502         <MultiAZ>{multi_az}</MultiAZ>\
2503         <EngineVersion>{engine_version}</EngineVersion>\
2504         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2505         {read_replica_identifiers_xml}\
2506         {read_replica_source_xml}\
2507         <LicenseModel>{license_model}</LicenseModel>\
2508         {option_group_memberships_xml}\
2509         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2510         <StorageType>gp2</StorageType>\
2511         <DbInstancePort>{port}</DbInstancePort>\
2512         <StorageEncrypted>false</StorageEncrypted>\
2513         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2514         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2515         {pending_modified_values_xml}\
2516         <DBInstanceArn>{arn}</DBInstanceArn>",
2517        identifier = xml_escape(&instance.db_instance_identifier),
2518        class = xml_escape(&instance.db_instance_class),
2519        engine = xml_escape(&instance.engine),
2520        status = xml_escape(status),
2521        master_username = xml_escape(&instance.master_username),
2522        endpoint_address = xml_escape(&instance.endpoint_address),
2523        port = instance.port,
2524        allocated_storage = instance.allocated_storage,
2525        create_time = instance.created_at.to_rfc3339(),
2526        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2527        backup_retention_period = instance.backup_retention_period,
2528        multi_az = if instance.multi_az { "true" } else { "false" },
2529        engine_version = xml_escape(&instance.engine_version),
2530        license_model = license_model_for_engine(&instance.engine),
2531        publicly_accessible = if instance.publicly_accessible {
2532            "true"
2533        } else {
2534            "false"
2535        },
2536        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2537        deletion_protection = if instance.deletion_protection {
2538            "true"
2539        } else {
2540            "false"
2541        },
2542        arn = xml_escape(&instance.db_instance_arn),
2543    )
2544}
2545
2546fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2547    format!(
2548        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2549         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2550         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2551         <Engine>{}</Engine>\
2552         <EngineVersion>{}</EngineVersion>\
2553         <AllocatedStorage>{}</AllocatedStorage>\
2554         <Status>{}</Status>\
2555         <Port>{}</Port>\
2556         <MasterUsername>{}</MasterUsername>\
2557         {}\
2558         <DbiResourceId>{}</DbiResourceId>\
2559         <SnapshotType>{}</SnapshotType>\
2560         <DBSnapshotArn>{}</DBSnapshotArn>",
2561        xml_escape(&snapshot.db_snapshot_identifier),
2562        xml_escape(&snapshot.db_instance_identifier),
2563        snapshot.snapshot_create_time.to_rfc3339(),
2564        xml_escape(&snapshot.engine),
2565        xml_escape(&snapshot.engine_version),
2566        snapshot.allocated_storage,
2567        xml_escape(&snapshot.status),
2568        snapshot.port,
2569        xml_escape(&snapshot.master_username),
2570        snapshot
2571            .db_name
2572            .as_ref()
2573            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2574            .unwrap_or_default(),
2575        xml_escape(&snapshot.dbi_resource_id),
2576        xml_escape(&snapshot.snapshot_type),
2577        xml_escape(&snapshot.db_snapshot_arn),
2578    )
2579}
2580
2581fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2582    let subnets_xml = subnet_group
2583        .subnet_ids
2584        .iter()
2585        .zip(&subnet_group.subnet_availability_zones)
2586        .map(|(subnet_id, az)| {
2587            format!(
2588                "<Subnet>\
2589                 <SubnetIdentifier>{}</SubnetIdentifier>\
2590                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2591                 <SubnetStatus>Active</SubnetStatus>\
2592                 </Subnet>",
2593                xml_escape(subnet_id),
2594                xml_escape(az)
2595            )
2596        })
2597        .collect::<String>();
2598
2599    format!(
2600        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2601         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2602         <VpcId>{}</VpcId>\
2603         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2604         <Subnets>{}</Subnets>\
2605         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2606        xml_escape(&subnet_group.db_subnet_group_name),
2607        xml_escape(&subnet_group.db_subnet_group_description),
2608        xml_escape(&subnet_group.vpc_id),
2609        subnets_xml,
2610        xml_escape(&subnet_group.db_subnet_group_arn),
2611    )
2612}
2613
2614fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2615    format!(
2616        "<DBParameterGroupName>{}</DBParameterGroupName>\
2617         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2618         <Description>{}</Description>\
2619         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2620        xml_escape(&parameter_group.db_parameter_group_name),
2621        xml_escape(&parameter_group.db_parameter_group_family),
2622        xml_escape(&parameter_group.description),
2623        xml_escape(&parameter_group.db_parameter_group_arn),
2624    )
2625}
2626
2627fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2628    AwsServiceError::aws_error(
2629        StatusCode::NOT_FOUND,
2630        "DBInstanceNotFound",
2631        format!("DBInstance {} not found.", identifier),
2632    )
2633}
2634
2635fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2636    AwsServiceError::aws_error(
2637        StatusCode::NOT_FOUND,
2638        "DBSnapshotNotFound",
2639        format!("DBSnapshot {} not found.", identifier),
2640    )
2641}
2642
2643fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2644    AwsServiceError::aws_error(
2645        StatusCode::NOT_FOUND,
2646        "DBInstanceNotFound",
2647        format!("DBInstance {resource_name} not found."),
2648    )
2649}
2650
2651fn find_instance_by_arn<'a>(
2652    state: &'a crate::state::RdsState,
2653    resource_name: &str,
2654) -> Result<&'a DbInstance, AwsServiceError> {
2655    state
2656        .instances
2657        .values()
2658        .find(|instance| instance.db_instance_arn == resource_name)
2659        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2660}
2661
2662fn find_instance_by_arn_mut<'a>(
2663    state: &'a mut crate::state::RdsState,
2664    resource_name: &str,
2665) -> Result<&'a mut DbInstance, AwsServiceError> {
2666    state
2667        .instances
2668        .values_mut()
2669        .find(|instance| instance.db_instance_arn == resource_name)
2670        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2671}
2672
2673fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2674    for tag in incoming {
2675        if let Some(existing_tag) = existing
2676            .iter_mut()
2677            .find(|candidate| candidate.key == tag.key)
2678        {
2679            existing_tag.value = tag.value.clone();
2680        } else {
2681            existing.push(tag.clone());
2682        }
2683    }
2684}
2685
2686fn license_model_for_engine(engine: &str) -> &'static str {
2687    match engine {
2688        "mysql" | "mariadb" => "general-public-license",
2689        _ => "postgresql-license",
2690    }
2691}
2692
2693fn default_db_name(engine: &str) -> &'static str {
2694    match engine {
2695        "mysql" | "mariadb" => "mysql",
2696        _ => "postgres",
2697    }
2698}
2699
2700/// Pick the port AWS defaults to for a freshly-created instance of
2701/// `engine`. PostgreSQL lives on 5432; MySQL and MariaDB share 3306.
2702fn default_port_for_engine(engine: &str) -> i32 {
2703    match engine {
2704        "postgres" => 5432,
2705        "mysql" | "mariadb" => 3306,
2706        _ => 5432,
2707    }
2708}
2709
2710/// Pick the built-in parameter group name AWS assigns to a new
2711/// instance when the caller doesn't override it. The name encodes the
2712/// engine family plus its major version (e.g. `default.postgres16`,
2713/// `default.mysql8.0`).
2714fn default_parameter_group(engine: &str, engine_version: &str) -> String {
2715    match engine {
2716        "postgres" => {
2717            let major = engine_version.split('.').next().unwrap_or("16");
2718            format!("default.postgres{}", major)
2719        }
2720        "mysql" => {
2721            let major = if engine_version.starts_with("5.7") {
2722                "5.7"
2723            } else {
2724                "8.0"
2725            };
2726            format!("default.mysql{}", major)
2727        }
2728        "mariadb" => {
2729            let major = if engine_version.starts_with("10.11") {
2730                "10.11"
2731            } else {
2732                "10.6"
2733            };
2734            format!("default.mariadb{}", major)
2735        }
2736        _ => "default.postgres16".to_string(),
2737    }
2738}
2739
2740fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2741    match error {
2742        RuntimeError::Unavailable => AwsServiceError::aws_error(
2743            StatusCode::SERVICE_UNAVAILABLE,
2744            "InvalidParameterValue",
2745            "Docker/Podman is required for RDS DB instances but is not available",
2746        ),
2747        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2748            StatusCode::INTERNAL_SERVER_ERROR,
2749            "InternalFailure",
2750            message,
2751        ),
2752    }
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757    use std::collections::HashMap;
2758    use std::sync::Arc;
2759
2760    use bytes::Bytes;
2761    use chrono::Utc;
2762    use http::{HeaderMap, Method};
2763    use parking_lot::RwLock;
2764    use uuid::Uuid;
2765
2766    use super::{
2767        db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2768        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2769    };
2770    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
2771    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
2772
2773    #[test]
2774    fn filter_engine_versions_matches_requested_engine() {
2775        let versions = default_engine_versions();
2776
2777        let filtered =
2778            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2779
2780        assert_eq!(filtered.len(), 4); // All postgres versions
2781        assert!(filtered.iter().all(|v| v.engine == "postgres"));
2782    }
2783
2784    #[test]
2785    fn filter_orderable_options_respects_instance_class() {
2786        let options = default_orderable_options();
2787
2788        let filtered = filter_orderable_options(
2789            &options,
2790            &Some("postgres".to_string()),
2791            &Some("16.3".to_string()),
2792            &Some("db.t3.micro".to_string()),
2793            &None,
2794            Some(true),
2795        );
2796
2797        assert_eq!(filtered.len(), 1);
2798        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2799    }
2800
2801    #[test]
2802    fn validate_create_request_rejects_unsupported_engine() {
2803        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2804            .expect_err("unsupported engine");
2805
2806        assert_eq!(error.code(), "InvalidParameterValue");
2807    }
2808
2809    #[test]
2810    fn optional_i32_param_rejects_invalid_integer() {
2811        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2812
2813        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2814
2815        assert_eq!(error.code(), "InvalidParameterValue");
2816    }
2817
2818    #[test]
2819    fn db_instance_xml_renders_endpoint_and_status() {
2820        let created_at = Utc::now();
2821        let instance = DbInstance {
2822            db_instance_identifier: "test-db".to_string(),
2823            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2824            db_instance_class: "db.t3.micro".to_string(),
2825            engine: "postgres".to_string(),
2826            engine_version: "16.3".to_string(),
2827            db_instance_status: "available".to_string(),
2828            master_username: "admin".to_string(),
2829            db_name: Some("appdb".to_string()),
2830            endpoint_address: "127.0.0.1".to_string(),
2831            port: 15432,
2832            allocated_storage: 20,
2833            publicly_accessible: true,
2834            deletion_protection: false,
2835            created_at,
2836            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2837            master_user_password: "secret123".to_string(),
2838            container_id: "container".to_string(),
2839            host_port: 15432,
2840            tags: Vec::new(),
2841            read_replica_source_db_instance_identifier: None,
2842            read_replica_db_instance_identifiers: Vec::new(),
2843            vpc_security_group_ids: vec!["sg-12345678".to_string()],
2844            db_parameter_group_name: Some("default.postgres16".to_string()),
2845            backup_retention_period: 1,
2846            preferred_backup_window: "03:00-04:00".to_string(),
2847            latest_restorable_time: Some(created_at),
2848            option_group_name: None,
2849            multi_az: false,
2850            pending_modified_values: None,
2851        };
2852
2853        let xml = db_instance_xml(&instance, Some("creating"));
2854
2855        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2856        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2857        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2858    }
2859
2860    #[test]
2861    fn parse_tags_reads_rds_query_shape() {
2862        let request = request(
2863            "AddTagsToResource",
2864            &[
2865                ("Tags.Tag.1.Key", "env"),
2866                ("Tags.Tag.1.Value", "dev"),
2867                ("Tags.Tag.2.Key", "team"),
2868                ("Tags.Tag.2.Value", "core"),
2869            ],
2870        );
2871
2872        let tags = parse_tags(&request).expect("tags");
2873
2874        assert_eq!(
2875            tags,
2876            vec![
2877                RdsTag {
2878                    key: "env".to_string(),
2879                    value: "dev".to_string(),
2880                },
2881                RdsTag {
2882                    key: "team".to_string(),
2883                    value: "core".to_string(),
2884                }
2885            ]
2886        );
2887    }
2888
2889    #[test]
2890    fn parse_tag_keys_reads_member_shape() {
2891        let request = request(
2892            "RemoveTagsFromResource",
2893            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2894        );
2895
2896        let tag_keys = parse_tag_keys(&request).expect("tag keys");
2897
2898        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2899    }
2900
2901    #[test]
2902    fn merge_tags_updates_existing_values() {
2903        let mut tags = vec![RdsTag {
2904            key: "env".to_string(),
2905            value: "dev".to_string(),
2906        }];
2907
2908        merge_tags(
2909            &mut tags,
2910            &[
2911                RdsTag {
2912                    key: "env".to_string(),
2913                    value: "prod".to_string(),
2914                },
2915                RdsTag {
2916                    key: "team".to_string(),
2917                    value: "core".to_string(),
2918                },
2919            ],
2920        );
2921
2922        assert_eq!(tags.len(), 2);
2923        assert_eq!(tags[0].value, "prod");
2924        assert_eq!(tags[1].key, "team");
2925    }
2926
2927    #[tokio::test]
2928    async fn describe_engine_versions_returns_xml_body() {
2929        let service = RdsService::new(Arc::new(RwLock::new(
2930            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2931        )));
2932        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2933
2934        let response = service.handle(request).await.expect("response");
2935        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
2936
2937        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2938        assert!(body.contains("<Engine>postgres</Engine>"));
2939        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2940    }
2941
2942    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2943        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2944        for (key, value) in params {
2945            query_params.insert((*key).to_string(), (*value).to_string());
2946        }
2947
2948        AwsRequest {
2949            service: "rds".to_string(),
2950            action: action.to_string(),
2951            region: "us-east-1".to_string(),
2952            account_id: "123456789012".to_string(),
2953            request_id: "test-request-id".to_string(),
2954            headers: HeaderMap::new(),
2955            query_params,
2956            body: Bytes::new(),
2957            path_segments: vec![],
2958            raw_path: "/".to_string(),
2959            raw_query: String::new(),
2960            method: Method::POST,
2961            is_query_protocol: true,
2962            access_key_id: None,
2963            principal: None,
2964        }
2965    }
2966
2967    // ── Helpers for handler tests ────────────────────────────────────
2968
2969    fn make_service() -> RdsService {
2970        RdsService::new(Arc::new(RwLock::new(
2971            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2972        )))
2973    }
2974
2975    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
2976        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
2977    }
2978
2979    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
2980        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
2981        let mut accounts = svc.state.write();
2982        let state = accounts.default_mut();
2983        state.instances.insert(
2984            identifier.to_string(),
2985            DbInstance {
2986                db_instance_identifier: identifier.to_string(),
2987                db_instance_arn: arn.clone(),
2988                db_instance_class: "db.t3.micro".to_string(),
2989                engine: "postgres".to_string(),
2990                engine_version: "16.3".to_string(),
2991                db_instance_status: "available".to_string(),
2992                master_username: "admin".to_string(),
2993                db_name: Some("appdb".to_string()),
2994                endpoint_address: "127.0.0.1".to_string(),
2995                port: 15432,
2996                allocated_storage: 20,
2997                publicly_accessible: true,
2998                deletion_protection: false,
2999                created_at: Utc::now(),
3000                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3001                master_user_password: "secret".to_string(),
3002                container_id: "container".to_string(),
3003                host_port: 15432,
3004                tags: Vec::new(),
3005                read_replica_source_db_instance_identifier: None,
3006                read_replica_db_instance_identifiers: Vec::new(),
3007                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3008                db_parameter_group_name: Some("default.postgres16".to_string()),
3009                backup_retention_period: 1,
3010                preferred_backup_window: "03:00-04:00".to_string(),
3011                latest_restorable_time: None,
3012                option_group_name: None,
3013                multi_az: false,
3014                pending_modified_values: None,
3015            },
3016        );
3017        arn
3018    }
3019
3020    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3021        match result {
3022            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3023            Err(e) => {
3024                assert_eq!(e.code(), expected_code, "wrong error code");
3025                e
3026            }
3027        }
3028    }
3029
3030    // ── Tag operations ───────────────────────────────────────────────
3031
3032    #[test]
3033    fn add_tags_requires_resource_name() {
3034        let svc = make_service();
3035        let req = request("AddTagsToResource", &[]);
3036        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3037    }
3038
3039    #[test]
3040    fn add_tags_requires_at_least_one_tag() {
3041        let svc = make_service();
3042        let arn = seed_instance(&svc, "db1");
3043        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3044        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3045    }
3046
3047    #[test]
3048    fn add_tags_appends_then_list_tags_returns_them() {
3049        let svc = make_service();
3050        let arn = seed_instance(&svc, "db1");
3051        let add_req = request(
3052            "AddTagsToResource",
3053            &[
3054                ("ResourceName", arn.as_str()),
3055                ("Tags.Tag.1.Key", "env"),
3056                ("Tags.Tag.1.Value", "dev"),
3057            ],
3058        );
3059        svc.add_tags_to_resource(&add_req).unwrap();
3060
3061        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3062        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3063        assert!(body.contains("<Key>env</Key>"));
3064        assert!(body.contains("<Value>dev</Value>"));
3065    }
3066
3067    #[test]
3068    fn list_tags_rejects_filters_param() {
3069        let svc = make_service();
3070        let arn = seed_instance(&svc, "db1");
3071        let req = request(
3072            "ListTagsForResource",
3073            &[
3074                ("ResourceName", arn.as_str()),
3075                ("Filters.Filter.1.Name", "x"),
3076            ],
3077        );
3078        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3079    }
3080
3081    #[test]
3082    fn list_tags_unknown_arn_errors() {
3083        let svc = make_service();
3084        let req = request(
3085            "ListTagsForResource",
3086            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3087        );
3088        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3089    }
3090
3091    #[test]
3092    fn remove_tags_strips_only_listed_keys() {
3093        let svc = make_service();
3094        let arn = seed_instance(&svc, "db1");
3095        {
3096            let mut __a = svc.state.write();
3097            let state = __a.default_mut();
3098            let inst = state.instances.get_mut("db1").unwrap();
3099            inst.tags = vec![
3100                RdsTag {
3101                    key: "env".to_string(),
3102                    value: "dev".to_string(),
3103                },
3104                RdsTag {
3105                    key: "team".to_string(),
3106                    value: "core".to_string(),
3107                },
3108            ];
3109        }
3110        let req = request(
3111            "RemoveTagsFromResource",
3112            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3113        );
3114        svc.remove_tags_from_resource(&req).unwrap();
3115
3116        let __a = svc.state.read();
3117        let state = __a.default_ref();
3118        let tags = &state.instances.get("db1").unwrap().tags;
3119        assert_eq!(tags.len(), 1);
3120        assert_eq!(tags[0].key, "team");
3121    }
3122
3123    #[test]
3124    fn remove_tags_requires_keys() {
3125        let svc = make_service();
3126        let arn = seed_instance(&svc, "db1");
3127        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3128        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3129    }
3130
3131    // ── DB Subnet Groups ─────────────────────────────────────────────
3132
3133    fn create_subnet_group(svc: &RdsService, name: &str) {
3134        let req = request(
3135            "CreateDBSubnetGroup",
3136            &[
3137                ("DBSubnetGroupName", name),
3138                ("DBSubnetGroupDescription", "test"),
3139                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3140                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3141            ],
3142        );
3143        svc.create_db_subnet_group(&req).unwrap();
3144    }
3145
3146    #[test]
3147    fn create_db_subnet_group_requires_two_subnets() {
3148        let svc = make_service();
3149        let req = request(
3150            "CreateDBSubnetGroup",
3151            &[
3152                ("DBSubnetGroupName", "sg1"),
3153                ("DBSubnetGroupDescription", "t"),
3154                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3155            ],
3156        );
3157        assert_code(
3158            svc.create_db_subnet_group(&req),
3159            "DBSubnetGroupDoesNotCoverEnoughAZs",
3160        );
3161    }
3162
3163    #[test]
3164    fn create_db_subnet_group_rejects_empty_subnets() {
3165        let svc = make_service();
3166        let req = request(
3167            "CreateDBSubnetGroup",
3168            &[
3169                ("DBSubnetGroupName", "sg1"),
3170                ("DBSubnetGroupDescription", "t"),
3171            ],
3172        );
3173        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3174    }
3175
3176    #[test]
3177    fn create_db_subnet_group_rejects_duplicates() {
3178        let svc = make_service();
3179        create_subnet_group(&svc, "sg1");
3180        let req = request(
3181            "CreateDBSubnetGroup",
3182            &[
3183                ("DBSubnetGroupName", "sg1"),
3184                ("DBSubnetGroupDescription", "t"),
3185                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3186                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3187            ],
3188        );
3189        assert_code(
3190            svc.create_db_subnet_group(&req),
3191            "DBSubnetGroupAlreadyExists",
3192        );
3193    }
3194
3195    #[test]
3196    fn describe_db_subnet_groups_by_name_or_list() {
3197        let svc = make_service();
3198        create_subnet_group(&svc, "sg-alpha");
3199        create_subnet_group(&svc, "sg-beta");
3200
3201        let by_name = request(
3202            "DescribeDBSubnetGroups",
3203            &[("DBSubnetGroupName", "sg-alpha")],
3204        );
3205        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3206        assert!(body.contains("sg-alpha"));
3207        assert!(!body.contains("sg-beta"));
3208
3209        let list_all = request("DescribeDBSubnetGroups", &[]);
3210        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3211        assert!(body.contains("sg-alpha"));
3212        assert!(body.contains("sg-beta"));
3213    }
3214
3215    #[test]
3216    fn describe_db_subnet_groups_unknown_name_errors() {
3217        let svc = make_service();
3218        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3219        assert_code(
3220            svc.describe_db_subnet_groups(&req),
3221            "DBSubnetGroupNotFoundFault",
3222        );
3223    }
3224
3225    #[test]
3226    fn delete_db_subnet_group_unknown_errors() {
3227        let svc = make_service();
3228        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3229        assert_code(
3230            svc.delete_db_subnet_group(&req),
3231            "DBSubnetGroupNotFoundFault",
3232        );
3233    }
3234
3235    #[test]
3236    fn delete_db_subnet_group_removes_entry() {
3237        let svc = make_service();
3238        create_subnet_group(&svc, "sg1");
3239        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3240        svc.delete_db_subnet_group(&req).unwrap();
3241        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3242    }
3243
3244    #[test]
3245    fn modify_db_subnet_group_updates_subnet_ids() {
3246        let svc = make_service();
3247        create_subnet_group(&svc, "sg1");
3248        let req = request(
3249            "ModifyDBSubnetGroup",
3250            &[
3251                ("DBSubnetGroupName", "sg1"),
3252                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3253                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3254            ],
3255        );
3256        svc.modify_db_subnet_group(&req).unwrap();
3257
3258        let __a = svc.state.read();
3259        let state = __a.default_ref();
3260        let sg = state.subnet_groups.get("sg1").unwrap();
3261        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3262    }
3263
3264    // ── DB Parameter Groups ──────────────────────────────────────────
3265
3266    fn create_param_group(svc: &RdsService, name: &str) {
3267        let req = request(
3268            "CreateDBParameterGroup",
3269            &[
3270                ("DBParameterGroupName", name),
3271                ("DBParameterGroupFamily", "postgres16"),
3272                ("Description", "test"),
3273            ],
3274        );
3275        svc.create_db_parameter_group(&req).unwrap();
3276    }
3277
3278    #[test]
3279    fn create_db_parameter_group_rejects_unknown_family() {
3280        let svc = make_service();
3281        let req = request(
3282            "CreateDBParameterGroup",
3283            &[
3284                ("DBParameterGroupName", "pg1"),
3285                ("DBParameterGroupFamily", "oracle19"),
3286                ("Description", "t"),
3287            ],
3288        );
3289        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3290    }
3291
3292    #[test]
3293    fn create_db_parameter_group_rejects_duplicates() {
3294        let svc = make_service();
3295        create_param_group(&svc, "pg1");
3296        let req = request(
3297            "CreateDBParameterGroup",
3298            &[
3299                ("DBParameterGroupName", "pg1"),
3300                ("DBParameterGroupFamily", "postgres16"),
3301                ("Description", "t"),
3302            ],
3303        );
3304        assert_code(
3305            svc.create_db_parameter_group(&req),
3306            "DBParameterGroupAlreadyExists",
3307        );
3308    }
3309
3310    #[test]
3311    fn describe_db_parameter_groups_by_name_or_list() {
3312        let svc = make_service();
3313        create_param_group(&svc, "pg-alpha");
3314        create_param_group(&svc, "pg-beta");
3315        let by_name = request(
3316            "DescribeDBParameterGroups",
3317            &[("DBParameterGroupName", "pg-alpha")],
3318        );
3319        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3320        assert!(body.contains("pg-alpha"));
3321        assert!(!body.contains("pg-beta"));
3322        let list = request("DescribeDBParameterGroups", &[]);
3323        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3324        assert!(body.contains("pg-alpha"));
3325        assert!(body.contains("pg-beta"));
3326    }
3327
3328    #[test]
3329    fn describe_db_parameter_groups_unknown_name_errors() {
3330        let svc = make_service();
3331        let req = request(
3332            "DescribeDBParameterGroups",
3333            &[("DBParameterGroupName", "ghost")],
3334        );
3335        assert_code(
3336            svc.describe_db_parameter_groups(&req),
3337            "DBParameterGroupNotFound",
3338        );
3339    }
3340
3341    #[test]
3342    fn delete_db_parameter_group_rejects_default_groups() {
3343        let svc = make_service();
3344        let req = request(
3345            "DeleteDBParameterGroup",
3346            &[("DBParameterGroupName", "default.postgres16")],
3347        );
3348        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3349    }
3350
3351    #[test]
3352    fn delete_db_parameter_group_unknown_errors() {
3353        let svc = make_service();
3354        let req = request(
3355            "DeleteDBParameterGroup",
3356            &[("DBParameterGroupName", "ghost")],
3357        );
3358        assert_code(
3359            svc.delete_db_parameter_group(&req),
3360            "DBParameterGroupNotFound",
3361        );
3362    }
3363
3364    #[test]
3365    fn delete_db_parameter_group_removes_entry() {
3366        let svc = make_service();
3367        create_param_group(&svc, "pg1");
3368        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
3369        svc.delete_db_parameter_group(&req).unwrap();
3370        assert!(!svc
3371            .state
3372            .read()
3373            .default_ref()
3374            .parameter_groups
3375            .contains_key("pg1"));
3376    }
3377
3378    #[test]
3379    fn modify_db_parameter_group_updates_description() {
3380        let svc = make_service();
3381        create_param_group(&svc, "pg1");
3382        let req = request(
3383            "ModifyDBParameterGroup",
3384            &[
3385                ("DBParameterGroupName", "pg1"),
3386                ("Description", "shiny new"),
3387            ],
3388        );
3389        svc.modify_db_parameter_group(&req).unwrap();
3390        let __a = svc.state.read();
3391        let state = __a.default_ref();
3392        assert_eq!(
3393            state.parameter_groups.get("pg1").unwrap().description,
3394            "shiny new"
3395        );
3396    }
3397
3398    #[test]
3399    fn modify_db_parameter_group_unknown_errors() {
3400        let svc = make_service();
3401        let req = request(
3402            "ModifyDBParameterGroup",
3403            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
3404        );
3405        assert_code(
3406            svc.modify_db_parameter_group(&req),
3407            "DBParameterGroupNotFound",
3408        );
3409    }
3410
3411    // ── DescribeDBInstances ──────────────────────────────────────────
3412
3413    #[test]
3414    fn describe_db_instances_by_id_returns_only_one() {
3415        let svc = make_service();
3416        seed_instance(&svc, "db1");
3417        seed_instance(&svc, "db2");
3418        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
3419        let body = body_of(svc.describe_db_instances(&req).unwrap());
3420        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
3421        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
3422    }
3423
3424    #[test]
3425    fn describe_db_instances_unknown_id_errors() {
3426        let svc = make_service();
3427        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3428        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3429    }
3430
3431    #[test]
3432    fn describe_db_instances_lists_all_when_unbounded() {
3433        let svc = make_service();
3434        seed_instance(&svc, "db1");
3435        seed_instance(&svc, "db2");
3436        seed_instance(&svc, "db3");
3437        let req = request("DescribeDBInstances", &[]);
3438        let body = body_of(svc.describe_db_instances(&req).unwrap());
3439        for id in ["db1", "db2", "db3"] {
3440            assert!(body.contains(&format!(
3441                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
3442            )));
3443        }
3444    }
3445
3446    // ── ModifyDBInstance ─────────────────────────────────────────────
3447
3448    #[test]
3449    fn modify_db_instance_requires_at_least_one_change() {
3450        let svc = make_service();
3451        seed_instance(&svc, "db1");
3452        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
3453        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
3454    }
3455
3456    #[test]
3457    fn modify_db_instance_unknown_errors() {
3458        let svc = make_service();
3459        let req = request(
3460            "ModifyDBInstance",
3461            &[
3462                ("DBInstanceIdentifier", "ghost"),
3463                ("DBInstanceClass", "db.t3.small"),
3464            ],
3465        );
3466        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
3467    }
3468
3469    #[test]
3470    fn modify_db_instance_apply_immediately_updates_class() {
3471        let svc = make_service();
3472        seed_instance(&svc, "db1");
3473        let req = request(
3474            "ModifyDBInstance",
3475            &[
3476                ("DBInstanceIdentifier", "db1"),
3477                ("DBInstanceClass", "db.t3.small"),
3478                ("ApplyImmediately", "true"),
3479            ],
3480        );
3481        svc.modify_db_instance(&req).unwrap();
3482        let __a = svc.state.read();
3483        let state = __a.default_ref();
3484        assert_eq!(
3485            state.instances.get("db1").unwrap().db_instance_class,
3486            "db.t3.small"
3487        );
3488    }
3489
3490    #[test]
3491    fn modify_db_instance_pending_when_not_apply_immediately() {
3492        let svc = make_service();
3493        seed_instance(&svc, "db1");
3494        let req = request(
3495            "ModifyDBInstance",
3496            &[
3497                ("DBInstanceIdentifier", "db1"),
3498                ("DBInstanceClass", "db.t3.small"),
3499                ("ApplyImmediately", "false"),
3500            ],
3501        );
3502        svc.modify_db_instance(&req).unwrap();
3503        let __a = svc.state.read();
3504        let state = __a.default_ref();
3505        let inst = state.instances.get("db1").unwrap();
3506        assert_eq!(inst.db_instance_class, "db.t3.micro");
3507        assert_eq!(
3508            inst.pending_modified_values
3509                .as_ref()
3510                .unwrap()
3511                .db_instance_class
3512                .as_deref(),
3513            Some("db.t3.small"),
3514        );
3515    }
3516
3517    // ── Snapshots (sync ops only) ────────────────────────────────────
3518
3519    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
3520        let mut __a = svc.state.write();
3521        let state = __a.default_mut();
3522        let arn = state.db_snapshot_arn(snapshot_id);
3523        state.snapshots.insert(
3524            snapshot_id.to_string(),
3525            crate::state::DbSnapshot {
3526                db_snapshot_identifier: snapshot_id.to_string(),
3527                db_snapshot_arn: arn,
3528                db_instance_identifier: instance_id.to_string(),
3529                snapshot_create_time: Utc::now(),
3530                engine: "postgres".to_string(),
3531                engine_version: "16.3".to_string(),
3532                allocated_storage: 20,
3533                status: "available".to_string(),
3534                port: 5432,
3535                master_username: "admin".to_string(),
3536                db_name: Some("appdb".to_string()),
3537                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3538                snapshot_type: "manual".to_string(),
3539                master_user_password: "secret".to_string(),
3540                tags: Vec::new(),
3541                dump_data: Vec::new(),
3542            },
3543        );
3544    }
3545
3546    #[test]
3547    fn delete_db_snapshot_removes_entry() {
3548        let svc = make_service();
3549        seed_snapshot(&svc, "snap1", "db1");
3550        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
3551        svc.delete_db_snapshot(&req).unwrap();
3552        assert!(svc.state.read().default_ref().snapshots.is_empty());
3553    }
3554
3555    #[test]
3556    fn delete_db_snapshot_unknown_errors() {
3557        let svc = make_service();
3558        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
3559        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
3560    }
3561
3562    #[test]
3563    fn describe_db_snapshots_rejects_both_filters() {
3564        let svc = make_service();
3565        let req = request(
3566            "DescribeDBSnapshots",
3567            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
3568        );
3569        assert_code(
3570            svc.describe_db_snapshots(&req),
3571            "InvalidParameterCombination",
3572        );
3573    }
3574
3575    #[test]
3576    fn describe_db_snapshots_by_id_or_instance() {
3577        let svc = make_service();
3578        seed_snapshot(&svc, "snap1", "db1");
3579        seed_snapshot(&svc, "snap2", "db2");
3580
3581        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
3582        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
3583        assert!(body.contains("snap1"));
3584        assert!(!body.contains("snap2"));
3585
3586        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
3587        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
3588        assert!(body.contains("snap2"));
3589        assert!(!body.contains("snap1"));
3590
3591        let list_all = request("DescribeDBSnapshots", &[]);
3592        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
3593        assert!(body.contains("snap1"));
3594        assert!(body.contains("snap2"));
3595    }
3596
3597    #[test]
3598    fn describe_db_snapshots_unknown_id_errors() {
3599        let svc = make_service();
3600        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
3601        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
3602    }
3603
3604    // ── Error branch tests ──
3605
3606    #[test]
3607    fn describe_db_instances_not_found() {
3608        let svc = make_service();
3609        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3610        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3611    }
3612
3613    #[tokio::test]
3614    async fn delete_db_instance_not_found() {
3615        let svc = make_service();
3616        let req = request(
3617            "DeleteDBInstance",
3618            &[
3619                ("DBInstanceIdentifier", "ghost"),
3620                ("SkipFinalSnapshot", "true"),
3621            ],
3622        );
3623        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
3624    }
3625
3626    #[test]
3627    fn modify_db_instance_not_found() {
3628        let svc = make_service();
3629        let req = request(
3630            "ModifyDBInstance",
3631            &[
3632                ("DBInstanceIdentifier", "ghost"),
3633                ("AllocatedStorage", "20"),
3634            ],
3635        );
3636        // Validation fires before existence check
3637        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
3638    }
3639
3640    #[tokio::test]
3641    async fn reboot_db_instance_not_found() {
3642        let svc = make_service();
3643        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
3644        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
3645    }
3646
3647    #[tokio::test]
3648    async fn create_db_snapshot_instance_not_found() {
3649        let svc = make_service();
3650        let req = request(
3651            "CreateDBSnapshot",
3652            &[
3653                ("DBInstanceIdentifier", "ghost"),
3654                ("DBSnapshotIdentifier", "snap1"),
3655            ],
3656        );
3657        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
3658    }
3659
3660    #[tokio::test]
3661    async fn restore_db_instance_snapshot_not_found() {
3662        let svc = make_service();
3663        let req = request(
3664            "RestoreDBInstanceFromDBSnapshot",
3665            &[
3666                ("DBInstanceIdentifier", "restored"),
3667                ("DBSnapshotIdentifier", "ghost-snap"),
3668            ],
3669        );
3670        assert_code(
3671            svc.restore_db_instance_from_db_snapshot(&req).await,
3672            "InvalidParameterValue",
3673        );
3674    }
3675
3676    #[tokio::test]
3677    async fn create_db_instance_read_replica_source_not_found() {
3678        let svc = make_service();
3679        let req = request(
3680            "CreateDBInstanceReadReplica",
3681            &[
3682                ("DBInstanceIdentifier", "replica"),
3683                ("SourceDBInstanceIdentifier", "ghost"),
3684            ],
3685        );
3686        assert_code(
3687            svc.create_db_instance_read_replica(&req).await,
3688            "InvalidParameterValue",
3689        );
3690    }
3691
3692    #[test]
3693    fn describe_db_engine_versions_basic() {
3694        let svc = make_service();
3695        let req = request("DescribeDBEngineVersions", &[]);
3696        let resp = svc.describe_db_engine_versions(&req).unwrap();
3697        let body = body_of(resp);
3698        assert!(body.contains("<DBEngineVersions>"));
3699    }
3700
3701    #[test]
3702    fn describe_orderable_db_instance_options_basic() {
3703        let svc = make_service();
3704        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
3705        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
3706        let body = body_of(resp);
3707        assert!(body.contains("<OrderableDBInstanceOptions>"));
3708    }
3709
3710    #[test]
3711    fn describe_db_parameter_group_not_found() {
3712        let svc = make_service();
3713        let req = request(
3714            "DescribeDBParameterGroups",
3715            &[("DBParameterGroupName", "ghost")],
3716        );
3717        assert_code(
3718            svc.describe_db_parameter_groups(&req),
3719            "DBParameterGroupNotFound",
3720        );
3721    }
3722
3723    #[test]
3724    fn delete_db_parameter_group_not_found() {
3725        let svc = make_service();
3726        let req = request(
3727            "DeleteDBParameterGroup",
3728            &[("DBParameterGroupName", "ghost")],
3729        );
3730        assert_code(
3731            svc.delete_db_parameter_group(&req),
3732            "DBParameterGroupNotFound",
3733        );
3734    }
3735
3736    #[test]
3737    fn describe_db_subnet_group_not_found() {
3738        let svc = make_service();
3739        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3740        assert_code(
3741            svc.describe_db_subnet_groups(&req),
3742            "DBSubnetGroupNotFoundFault",
3743        );
3744    }
3745
3746    #[test]
3747    fn delete_db_subnet_group_not_found() {
3748        let svc = make_service();
3749        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3750        assert_code(
3751            svc.delete_db_subnet_group(&req),
3752            "DBSubnetGroupNotFoundFault",
3753        );
3754    }
3755
3756    #[test]
3757    fn add_tags_resource_not_found() {
3758        let svc = make_service();
3759        let req = request(
3760            "AddTagsToResource",
3761            &[
3762                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
3763                ("Tags.member.1.Key", "k"),
3764                ("Tags.member.1.Value", "v"),
3765            ],
3766        );
3767        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3768    }
3769
3770    #[test]
3771    fn list_tags_resource_not_found() {
3772        let svc = make_service();
3773        let req = request(
3774            "ListTagsForResource",
3775            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
3776        );
3777        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3778    }
3779
3780    // ── snapshot operations ──
3781
3782    #[tokio::test]
3783    async fn create_db_snapshot_missing_id_errors() {
3784        let svc = make_service();
3785        let req = request(
3786            "CreateDBSnapshot",
3787            &[("DBInstanceIdentifier", "nonexistent")],
3788        );
3789        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
3790    }
3791
3792    #[tokio::test]
3793    async fn create_db_snapshot_unknown_instance_errors() {
3794        let svc = make_service();
3795        let req = request(
3796            "CreateDBSnapshot",
3797            &[
3798                ("DBSnapshotIdentifier", "snap1"),
3799                ("DBInstanceIdentifier", "ghost"),
3800            ],
3801        );
3802        assert!(svc.create_db_snapshot(&req).await.is_err());
3803    }
3804
3805    // ── delete_db_instance ──
3806
3807    #[tokio::test]
3808    async fn delete_db_instance_missing_id_errors() {
3809        let svc = make_service();
3810        let req = request("DeleteDBInstance", &[]);
3811        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
3812    }
3813
3814    // ── reboot_db_instance ──
3815
3816    #[tokio::test]
3817    async fn reboot_db_instance_missing_id_errors() {
3818        let svc = make_service();
3819        let req = request("RebootDBInstance", &[]);
3820        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
3821    }
3822
3823    // ── create_db_instance validation ──
3824
3825    #[tokio::test]
3826    async fn create_db_instance_missing_id_errors() {
3827        let svc = make_service();
3828        let req = request(
3829            "CreateDBInstance",
3830            &[
3831                ("Engine", "postgres"),
3832                ("DBInstanceClass", "db.t3.micro"),
3833                ("AllocatedStorage", "20"),
3834                ("MasterUsername", "admin"),
3835                ("MasterUserPassword", "secretpass"),
3836            ],
3837        );
3838        assert!(svc.create_db_instance(&req).await.is_err());
3839    }
3840
3841    #[tokio::test]
3842    async fn create_db_instance_unsupported_engine_errors() {
3843        let svc = make_service();
3844        let req = request(
3845            "CreateDBInstance",
3846            &[
3847                ("DBInstanceIdentifier", "db1"),
3848                ("Engine", "mongodb"),
3849                ("DBInstanceClass", "db.t3.micro"),
3850                ("AllocatedStorage", "20"),
3851                ("MasterUsername", "admin"),
3852                ("MasterUserPassword", "secretpass"),
3853            ],
3854        );
3855        assert!(svc.create_db_instance(&req).await.is_err());
3856    }
3857
3858    // ── restore_db_instance_from_db_snapshot ──
3859
3860    #[tokio::test]
3861    async fn restore_db_instance_missing_ids_errors() {
3862        let svc = make_service();
3863        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
3864        assert!(svc
3865            .restore_db_instance_from_db_snapshot(&req)
3866            .await
3867            .is_err());
3868    }
3869
3870    #[tokio::test]
3871    async fn restore_db_instance_unknown_snapshot_errors() {
3872        let svc = make_service();
3873        let req = request(
3874            "RestoreDBInstanceFromDBSnapshot",
3875            &[
3876                ("DBInstanceIdentifier", "restored"),
3877                ("DBSnapshotIdentifier", "missing"),
3878            ],
3879        );
3880        assert!(svc
3881            .restore_db_instance_from_db_snapshot(&req)
3882            .await
3883            .is_err());
3884    }
3885
3886    // ── create_db_instance_read_replica ──
3887
3888    #[tokio::test]
3889    async fn create_read_replica_missing_source_errors() {
3890        let svc = make_service();
3891        let req = request(
3892            "CreateDBInstanceReadReplica",
3893            &[("DBInstanceIdentifier", "replica1")],
3894        );
3895        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
3896    }
3897
3898    #[tokio::test]
3899    async fn create_read_replica_unknown_source_errors() {
3900        let svc = make_service();
3901        let req = request(
3902            "CreateDBInstanceReadReplica",
3903            &[
3904                ("DBInstanceIdentifier", "replica1"),
3905                ("SourceDBInstanceIdentifier", "ghost"),
3906            ],
3907        );
3908        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
3909    }
3910
3911    // ── describe_db_snapshots with filters ──
3912
3913    #[test]
3914    fn describe_db_snapshots_by_snapshot_id_only() {
3915        let svc = make_service();
3916        seed_snapshot(&svc, "s1", "inst1");
3917        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
3918        let resp = svc.describe_db_snapshots(&req).unwrap();
3919        let b = body_of(resp);
3920        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
3921    }
3922
3923    #[test]
3924    fn describe_db_snapshots_by_instance_id_returns_matching() {
3925        let svc = make_service();
3926        seed_snapshot(&svc, "s1", "inst1");
3927        seed_snapshot(&svc, "s2", "inst2");
3928        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
3929        let resp = svc.describe_db_snapshots(&req).unwrap();
3930        let b = body_of(resp);
3931        assert!(b.contains("s1"));
3932        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
3933    }
3934
3935    // ── modify_db_parameter_group ──
3936
3937    #[test]
3938    fn modify_db_parameter_group_missing_name() {
3939        let svc = make_service();
3940        let req = request("ModifyDBParameterGroup", &[]);
3941        assert!(svc.modify_db_parameter_group(&req).is_err());
3942    }
3943
3944    // ── modify_db_subnet_group ──
3945
3946    #[test]
3947    fn modify_db_subnet_group_unknown_errors() {
3948        let svc = make_service();
3949        let req = request(
3950            "ModifyDBSubnetGroup",
3951            &[
3952                ("DBSubnetGroupName", "ghost"),
3953                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
3954                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
3955            ],
3956        );
3957        assert!(svc.modify_db_subnet_group(&req).is_err());
3958    }
3959
3960    // ── describe_db_instances ──
3961
3962    #[test]
3963    fn describe_db_instances_empty_returns_xml() {
3964        let svc = make_service();
3965        let req = request("DescribeDBInstances", &[]);
3966        let resp = svc.describe_db_instances(&req).unwrap();
3967        let b = body_of(resp);
3968        assert!(b.contains("DescribeDBInstancesResult"));
3969    }
3970
3971    #[test]
3972    fn describe_db_snapshots_empty_returns_empty_list() {
3973        let svc = make_service();
3974        let req = request("DescribeDBSnapshots", &[]);
3975        let resp = svc.describe_db_snapshots(&req).unwrap();
3976        let b = body_of(resp);
3977        assert!(b.contains("DescribeDBSnapshotsResult"));
3978    }
3979
3980    #[test]
3981    fn add_tags_unknown_resource_errors() {
3982        let svc = make_service();
3983        let req = request(
3984            "AddTagsToResource",
3985            &[
3986                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
3987                ("Tags.member.1.Key", "k"),
3988                ("Tags.member.1.Value", "v"),
3989            ],
3990        );
3991        assert!(svc.add_tags_to_resource(&req).is_err());
3992    }
3993
3994    #[test]
3995    fn remove_tags_unknown_resource_errors() {
3996        let svc = make_service();
3997        let req = request(
3998            "RemoveTagsFromResource",
3999            &[
4000                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4001                ("TagKeys.member.1", "k"),
4002            ],
4003        );
4004        assert!(svc.remove_tags_from_resource(&req).is_err());
4005    }
4006
4007    #[test]
4008    fn create_db_parameter_group_missing_name_errors() {
4009        let svc = make_service();
4010        let req = request(
4011            "CreateDBParameterGroup",
4012            &[
4013                ("DBParameterGroupFamily", "postgres16"),
4014                ("Description", "d"),
4015            ],
4016        );
4017        assert!(svc.create_db_parameter_group(&req).is_err());
4018    }
4019
4020    #[test]
4021    fn create_db_subnet_group_missing_desc_errors() {
4022        let svc = make_service();
4023        let req = request(
4024            "CreateDBSubnetGroup",
4025            &[
4026                ("DBSubnetGroupName", "sg1"),
4027                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4028                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4029            ],
4030        );
4031        assert!(svc.create_db_subnet_group(&req).is_err());
4032    }
4033
4034    #[tokio::test]
4035    async fn create_db_instance_missing_class_errors() {
4036        let svc = make_service();
4037        let req = request(
4038            "CreateDBInstance",
4039            &[
4040                ("DBInstanceIdentifier", "miss-class"),
4041                ("Engine", "postgres"),
4042                ("AllocatedStorage", "20"),
4043                ("MasterUsername", "admin"),
4044                ("MasterUserPassword", "secretpass"),
4045            ],
4046        );
4047        assert!(svc.create_db_instance(&req).await.is_err());
4048    }
4049
4050    #[tokio::test]
4051    async fn create_db_instance_missing_master_username_errors() {
4052        let svc = make_service();
4053        let req = request(
4054            "CreateDBInstance",
4055            &[
4056                ("DBInstanceIdentifier", "miss-mu"),
4057                ("Engine", "postgres"),
4058                ("DBInstanceClass", "db.t3.micro"),
4059                ("AllocatedStorage", "20"),
4060                ("MasterUserPassword", "secretpass"),
4061            ],
4062        );
4063        assert!(svc.create_db_instance(&req).await.is_err());
4064    }
4065
4066    #[test]
4067    fn modify_db_instance_missing_id_errors() {
4068        let svc = make_service();
4069        let req = request("ModifyDBInstance", &[]);
4070        assert!(svc.modify_db_instance(&req).is_err());
4071    }
4072
4073    #[test]
4074    fn modify_db_parameter_group_unknown_pg_errors() {
4075        let svc = make_service();
4076        let req = request(
4077            "ModifyDBParameterGroup",
4078            &[
4079                ("DBParameterGroupName", "ghost"),
4080                ("Parameters.member.1.ParameterName", "p"),
4081                ("Parameters.member.1.ParameterValue", "v"),
4082                ("Parameters.member.1.ApplyMethod", "immediate"),
4083            ],
4084        );
4085        assert!(svc.modify_db_parameter_group(&req).is_err());
4086    }
4087
4088    #[test]
4089    fn describe_db_parameter_groups_unknown_errors() {
4090        let svc = make_service();
4091        let req = request(
4092            "DescribeDBParameterGroups",
4093            &[("DBParameterGroupName", "ghost")],
4094        );
4095        assert!(svc.describe_db_parameter_groups(&req).is_err());
4096    }
4097
4098    #[test]
4099    fn describe_db_subnet_groups_unknown_errors() {
4100        let svc = make_service();
4101        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4102        assert!(svc.describe_db_subnet_groups(&req).is_err());
4103    }
4104}