1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
12use fakecloud_persistence::SnapshotStore;
13
14use crate::runtime::{RdsRuntime, RuntimeError};
15use crate::state::{
16 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
17 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
18 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
19};
20
21const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
22
23fn is_mutating_action(action: &str) -> bool {
24 matches!(
25 action,
26 "AddTagsToResource"
27 | "CreateDBInstance"
28 | "CreateDBInstanceReadReplica"
29 | "CreateDBParameterGroup"
30 | "CreateDBSnapshot"
31 | "CreateDBSubnetGroup"
32 | "DeleteDBInstance"
33 | "DeleteDBParameterGroup"
34 | "DeleteDBSnapshot"
35 | "DeleteDBSubnetGroup"
36 | "ModifyDBInstance"
37 | "ModifyDBParameterGroup"
38 | "ModifyDBSubnetGroup"
39 | "RebootDBInstance"
40 | "RemoveTagsFromResource"
41 | "RestoreDBInstanceFromDBSnapshot"
42 )
43}
44const SUPPORTED_ACTIONS: &[&str] = &[
45 "AddTagsToResource",
46 "CreateDBInstance",
47 "CreateDBInstanceReadReplica",
48 "CreateDBParameterGroup",
49 "CreateDBSnapshot",
50 "CreateDBSubnetGroup",
51 "DeleteDBInstance",
52 "DeleteDBParameterGroup",
53 "DeleteDBSnapshot",
54 "DeleteDBSubnetGroup",
55 "DescribeDBEngineVersions",
56 "DescribeDBInstances",
57 "DescribeDBParameterGroups",
58 "DescribeDBSnapshots",
59 "DescribeDBSubnetGroups",
60 "DescribeOrderableDBInstanceOptions",
61 "ListTagsForResource",
62 "ModifyDBInstance",
63 "ModifyDBParameterGroup",
64 "ModifyDBSubnetGroup",
65 "RebootDBInstance",
66 "RemoveTagsFromResource",
67 "RestoreDBInstanceFromDBSnapshot",
68];
69
70pub struct RdsService {
71 state: SharedRdsState,
72 runtime: Option<Arc<RdsRuntime>>,
73 snapshot_store: Option<Arc<dyn SnapshotStore>>,
74 snapshot_lock: Arc<AsyncMutex<()>>,
75}
76
77impl RdsService {
78 pub fn new(state: SharedRdsState) -> Self {
79 Self {
80 state,
81 runtime: None,
82 snapshot_store: None,
83 snapshot_lock: Arc::new(AsyncMutex::new(())),
84 }
85 }
86
87 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
88 self.runtime = Some(runtime);
89 self
90 }
91
92 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
93 self.snapshot_store = Some(store);
94 self
95 }
96
97 async fn save_snapshot(&self) {
98 let Some(store) = self.snapshot_store.clone() else {
99 return;
100 };
101 let _guard = self.snapshot_lock.lock().await;
102 let snapshot = RdsSnapshot {
103 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
104 state: None,
105 accounts: Some(self.state.read().clone()),
106 };
107 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
108 let bytes = serde_json::to_vec(&snapshot)
109 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
110 store.save(&bytes)
111 })
112 .await;
113 match join {
114 Ok(Ok(())) => {}
115 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
116 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
117 }
118 }
119
120 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
126 self.runtime.as_ref().ok_or_else(|| {
127 AwsServiceError::aws_error(
128 StatusCode::SERVICE_UNAVAILABLE,
129 "InvalidParameterValue",
130 "Docker/Podman is required for RDS DB instances but is not available",
131 )
132 })
133 }
134}
135
136#[async_trait]
137impl AwsService for RdsService {
138 fn service_name(&self) -> &str {
139 "rds"
140 }
141
142 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
143 let mutates = is_mutating_action(request.action.as_str());
144 let result = match request.action.as_str() {
145 "AddTagsToResource" => self.add_tags_to_resource(&request),
146 "CreateDBInstance" => self.create_db_instance(&request).await,
147 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
148 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
149 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
150 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
151 "DeleteDBInstance" => self.delete_db_instance(&request).await,
152 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
153 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
154 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
155 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
156 "DescribeDBInstances" => self.describe_db_instances(&request),
157 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
158 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
159 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
160 "DescribeOrderableDBInstanceOptions" => {
161 self.describe_orderable_db_instance_options(&request)
162 }
163 "ListTagsForResource" => self.list_tags_for_resource(&request),
164 "ModifyDBInstance" => self.modify_db_instance(&request),
165 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
166 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
167 "RebootDBInstance" => self.reboot_db_instance(&request).await,
168 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
169 "RestoreDBInstanceFromDBSnapshot" => {
170 self.restore_db_instance_from_db_snapshot(&request).await
171 }
172 _ => Err(AwsServiceError::action_not_implemented(
173 self.service_name(),
174 &request.action,
175 )),
176 };
177 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
178 self.save_snapshot().await;
179 }
180 result
181 }
182
183 fn supported_actions(&self) -> &[&str] {
184 SUPPORTED_ACTIONS
185 }
186}
187
188impl RdsService {
189 async fn create_db_instance(
190 &self,
191 request: &AwsRequest,
192 ) -> Result<AwsResponse, AwsServiceError> {
193 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
194 let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
195 let db_instance_class = required_param(request, "DBInstanceClass")?;
196 let engine = required_param(request, "Engine")?;
197 let master_username = required_param(request, "MasterUsername")?;
198 let master_user_password = required_param(request, "MasterUserPassword")?;
199 let db_name = optional_param(request, "DBName");
200 let engine_version =
201 optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
202 let publicly_accessible =
203 parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
204 .unwrap_or(true);
205 let deletion_protection =
206 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
207 .unwrap_or(false);
208 let port = optional_i32_param(request, "Port")?
209 .unwrap_or_else(|| default_port_for_engine(&engine));
210 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
211
212 let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
213 .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
214
215 let backup_retention_period =
216 optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
217 let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
218 .unwrap_or_else(|| "03:00-04:00".to_string());
219 let option_group_name = optional_param(request, "OptionGroupName");
220 let multi_az =
221 parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
222
223 validate_create_request(
224 &db_instance_identifier,
225 allocated_storage,
226 &db_instance_class,
227 &engine,
228 &engine_version,
229 port,
230 )?;
231
232 {
233 let mut accounts = self.state.write();
234 let state = accounts.get_or_create(&request.account_id);
235 if !state.begin_instance_creation(&db_instance_identifier) {
236 return Err(AwsServiceError::aws_error(
237 StatusCode::BAD_REQUEST,
238 "DBInstanceAlreadyExists",
239 format!("DBInstance {} already exists.", db_instance_identifier),
240 ));
241 }
242 if let Some(ref pg_name) = db_parameter_group_name {
244 if !state.parameter_groups.contains_key(pg_name) {
245 state.cancel_instance_creation(&db_instance_identifier);
246 return Err(AwsServiceError::aws_error(
247 StatusCode::NOT_FOUND,
248 "DBParameterGroupNotFound",
249 format!("DBParameterGroup {} not found.", pg_name),
250 ));
251 }
252 }
253 }
254
255 let runtime = self.require_runtime()?;
256
257 let logical_db_name = db_name
258 .clone()
259 .unwrap_or_else(|| default_db_name(&engine).to_string());
260 let running = runtime
261 .ensure_postgres(
262 &db_instance_identifier,
263 &engine,
264 &engine_version,
265 &master_username,
266 &master_user_password,
267 &logical_db_name,
268 )
269 .await
270 .map_err(|error| {
271 self.state
272 .write()
273 .get_or_create(&request.account_id)
274 .cancel_instance_creation(&db_instance_identifier);
275 runtime_error_to_service_error(error)
276 })?;
277
278 let mut accounts = self.state.write();
279 let state = accounts.get_or_create(&request.account_id);
280 let created_at = Utc::now();
281 let instance = DbInstance {
282 db_instance_identifier: db_instance_identifier.clone(),
283 db_instance_arn: state.db_instance_arn(&db_instance_identifier),
284 db_instance_class: db_instance_class.clone(),
285 engine: engine.clone(),
286 engine_version: engine_version.clone(),
287 db_instance_status: "available".to_string(),
288 master_username: master_username.clone(),
289 db_name: db_name.clone(),
290 endpoint_address: "127.0.0.1".to_string(),
291 port: i32::from(running.host_port),
292 allocated_storage,
293 publicly_accessible,
294 deletion_protection,
295 created_at,
296 dbi_resource_id: state.next_dbi_resource_id(),
297 master_user_password,
298 container_id: running.container_id,
299 host_port: running.host_port,
300 tags: Vec::new(),
301 read_replica_source_db_instance_identifier: None,
302 read_replica_db_instance_identifiers: Vec::new(),
303 vpc_security_group_ids,
304 db_parameter_group_name,
305 backup_retention_period,
306 preferred_backup_window,
307 latest_restorable_time: if backup_retention_period > 0 {
308 Some(created_at)
309 } else {
310 None
311 },
312 option_group_name,
313 multi_az,
314 pending_modified_values: None,
315 };
316 state.finish_instance_creation(instance.clone());
317
318 Ok(AwsResponse::xml(
319 StatusCode::OK,
320 xml_wrap(
321 "CreateDBInstance",
322 &format!(
323 "<DBInstance>{}</DBInstance>",
324 db_instance_xml(&instance, Some("creating"))
325 ),
326 &request.request_id,
327 ),
328 ))
329 }
330
331 async fn delete_db_instance(
332 &self,
333 request: &AwsRequest,
334 ) -> Result<AwsResponse, AwsServiceError> {
335 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
336 let skip_final_snapshot =
337 parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
338 .unwrap_or(false);
339 let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
340
341 if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
342 return Err(AwsServiceError::aws_error(
343 StatusCode::BAD_REQUEST,
344 "InvalidParameterCombination",
345 "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
346 ));
347 }
348 if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
349 return Err(AwsServiceError::aws_error(
350 StatusCode::BAD_REQUEST,
351 "InvalidParameterCombination",
352 "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
353 ));
354 }
355
356 {
358 let accounts = self.state.read();
359 let empty = RdsState::new(&request.account_id, &request.region);
360 let state = accounts.get(&request.account_id).unwrap_or(&empty);
361 if let Some(instance) = state.instances.get(&db_instance_identifier) {
362 if instance.deletion_protection {
363 return Err(AwsServiceError::aws_error(
364 StatusCode::BAD_REQUEST,
365 "InvalidDBInstanceState",
366 format!(
367 "DBInstance {} cannot be deleted because deletion protection is enabled.",
368 db_instance_identifier
369 ),
370 ));
371 }
372 } else {
373 return Err(db_instance_not_found(&db_instance_identifier));
374 }
375 }
376
377 if let Some(ref snapshot_id) = final_db_snapshot_identifier {
378 self.create_final_db_snapshot(
379 &db_instance_identifier,
380 snapshot_id,
381 &request.account_id,
382 &request.region,
383 )
384 .await?;
385 }
386
387 let instance = {
388 let mut accounts = self.state.write();
389 let state = accounts.get_or_create(&request.account_id);
390 let instance = state
391 .instances
392 .remove(&db_instance_identifier)
393 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
394
395 if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
396 if let Some(source) = state.instances.get_mut(source_id) {
397 source
398 .read_replica_db_instance_identifiers
399 .retain(|id| id != &db_instance_identifier);
400 }
401 }
402
403 for replica_id in &instance.read_replica_db_instance_identifiers {
404 if let Some(replica) = state.instances.get_mut(replica_id) {
405 replica.read_replica_source_db_instance_identifier = None;
406 }
407 }
408
409 instance
410 };
411
412 if let Some(runtime) = &self.runtime {
413 runtime.stop_container(&db_instance_identifier).await;
414 }
415
416 Ok(AwsResponse::xml(
417 StatusCode::OK,
418 xml_wrap(
419 "DeleteDBInstance",
420 &format!(
421 "<DBInstance>{}</DBInstance>",
422 db_instance_xml(&instance, Some("deleting"))
423 ),
424 &request.request_id,
425 ),
426 ))
427 }
428
429 async fn create_final_db_snapshot(
435 &self,
436 db_instance_identifier: &str,
437 snapshot_id: &str,
438 account_id: &str,
439 region: &str,
440 ) -> Result<(), AwsServiceError> {
441 let runtime = self.runtime.as_ref().ok_or_else(|| {
442 AwsServiceError::aws_error(
443 StatusCode::SERVICE_UNAVAILABLE,
444 "InvalidParameterValue",
445 "Docker/Podman is required for RDS snapshots but is not available",
446 )
447 })?;
448
449 let (instance_for_snapshot, db_name) = {
450 let accounts = self.state.read();
451 let empty = RdsState::new(account_id, region);
452 let state = accounts.get(account_id).unwrap_or(&empty);
453
454 if state.snapshots.contains_key(snapshot_id) {
455 return Err(AwsServiceError::aws_error(
456 StatusCode::CONFLICT,
457 "DBSnapshotAlreadyExists",
458 format!("DBSnapshot {snapshot_id} already exists."),
459 ));
460 }
461
462 let instance = state
463 .instances
464 .get(db_instance_identifier)
465 .cloned()
466 .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
467
468 let default_db = default_db_name(&instance.engine);
469 let db_name = instance
470 .db_name
471 .as_deref()
472 .unwrap_or(default_db)
473 .to_string();
474
475 (instance, db_name)
476 };
477
478 let dump_data = runtime
479 .dump_database(
480 db_instance_identifier,
481 &instance_for_snapshot.engine,
482 &instance_for_snapshot.master_username,
483 &instance_for_snapshot.master_user_password,
484 &db_name,
485 )
486 .await
487 .map_err(runtime_error_to_service_error)?;
488
489 let mut accounts = self.state.write();
490 let state = accounts.get_or_create(account_id);
491
492 if state.snapshots.contains_key(snapshot_id) {
493 return Err(AwsServiceError::aws_error(
494 StatusCode::CONFLICT,
495 "DBSnapshotAlreadyExists",
496 format!("DBSnapshot {snapshot_id} already exists."),
497 ));
498 }
499
500 let snapshot_arn = state.db_snapshot_arn(snapshot_id);
501
502 let snapshot = DbSnapshot {
503 db_snapshot_identifier: snapshot_id.to_string(),
504 db_snapshot_arn: snapshot_arn,
505 db_instance_identifier: db_instance_identifier.to_string(),
506 snapshot_create_time: Utc::now(),
507 engine: instance_for_snapshot.engine.clone(),
508 engine_version: instance_for_snapshot.engine_version.clone(),
509 allocated_storage: instance_for_snapshot.allocated_storage,
510 status: "available".to_string(),
511 port: instance_for_snapshot.port,
512 master_username: instance_for_snapshot.master_username.clone(),
513 db_name: instance_for_snapshot.db_name.clone(),
514 dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
515 snapshot_type: "manual".to_string(),
516 master_user_password: instance_for_snapshot.master_user_password.clone(),
517 tags: Vec::new(),
518 dump_data,
519 };
520
521 state.snapshots.insert(snapshot_id.to_string(), snapshot);
522 Ok(())
523 }
524
525 fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
526 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
527 let db_instance_class = optional_param(request, "DBInstanceClass");
528 let deletion_protection =
529 parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
530 let apply_immediately =
531 parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
532
533 let vpc_security_group_ids = {
535 let mut ids = Vec::new();
536 for index in 1.. {
537 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
538 match optional_param(request, &sg_id_name) {
539 Some(sg_id) => ids.push(sg_id),
540 None => break,
541 }
542 }
543 if ids.is_empty() {
544 None
545 } else {
546 Some(ids)
547 }
548 };
549
550 if db_instance_class.is_none()
551 && deletion_protection.is_none()
552 && vpc_security_group_ids.is_none()
553 {
554 return Err(AwsServiceError::aws_error(
555 StatusCode::BAD_REQUEST,
556 "InvalidParameterCombination",
557 "At least one supported mutable field must be provided.",
558 ));
559 }
560 if let Some(ref class) = db_instance_class {
561 validate_db_instance_class(class)?;
562 }
563
564 let mut accounts = self.state.write();
565 let state = accounts.get_or_create(&request.account_id);
566 let instance = state
567 .instances
568 .get_mut(&db_instance_identifier)
569 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
570
571 if apply_immediately == Some(false) {
573 let pending = instance
574 .pending_modified_values
575 .get_or_insert(Default::default());
576 if let Some(class) = db_instance_class {
577 pending.db_instance_class = Some(class);
578 }
579 if let Some(deletion_protection) = deletion_protection {
582 instance.deletion_protection = deletion_protection;
583 }
584 if let Some(security_group_ids) = vpc_security_group_ids {
585 instance.vpc_security_group_ids = security_group_ids;
586 }
587 } else {
588 if let Some(class) = db_instance_class {
590 instance.db_instance_class = class;
591 }
592 if let Some(deletion_protection) = deletion_protection {
593 instance.deletion_protection = deletion_protection;
594 }
595 if let Some(security_group_ids) = vpc_security_group_ids {
596 instance.vpc_security_group_ids = security_group_ids;
597 }
598 }
599
600 Ok(AwsResponse::xml(
601 StatusCode::OK,
602 xml_wrap(
603 "ModifyDBInstance",
604 &format!(
605 "<DBInstance>{}</DBInstance>",
606 db_instance_xml(instance, Some("modifying"))
607 ),
608 &request.request_id,
609 ),
610 ))
611 }
612
613 async fn reboot_db_instance(
614 &self,
615 request: &AwsRequest,
616 ) -> Result<AwsResponse, AwsServiceError> {
617 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
618 let force_failover =
619 parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
620 if force_failover == Some(true) {
621 return Err(AwsServiceError::aws_error(
622 StatusCode::BAD_REQUEST,
623 "InvalidParameterCombination",
624 "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
625 ));
626 }
627
628 let instance = {
629 let accounts = self.state.read();
630 let empty = RdsState::new(&request.account_id, &request.region);
631 let state = accounts.get(&request.account_id).unwrap_or(&empty);
632 state
633 .instances
634 .get(&db_instance_identifier)
635 .cloned()
636 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
637 };
638
639 let runtime = self.require_runtime()?;
640
641 let running = runtime
642 .restart_container(
643 &db_instance_identifier,
644 &instance.engine,
645 &instance.master_username,
646 &instance.master_user_password,
647 instance
648 .db_name
649 .as_deref()
650 .unwrap_or(default_db_name(&instance.engine)),
651 )
652 .await
653 .map_err(runtime_error_to_service_error)?;
654
655 let instance = {
656 let mut accounts = self.state.write();
657 let state = accounts.get_or_create(&request.account_id);
658 let instance = state
659 .instances
660 .get_mut(&db_instance_identifier)
661 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
662 instance.host_port = running.host_port;
663 instance.port = i32::from(running.host_port);
664
665 if let Some(pending) = instance.pending_modified_values.take() {
667 if let Some(class) = pending.db_instance_class {
668 instance.db_instance_class = class;
669 }
670 if let Some(allocated_storage) = pending.allocated_storage {
671 instance.allocated_storage = allocated_storage;
672 }
673 if let Some(backup_retention_period) = pending.backup_retention_period {
674 instance.backup_retention_period = backup_retention_period;
675 }
676 if let Some(multi_az) = pending.multi_az {
677 instance.multi_az = multi_az;
678 }
679 if let Some(engine_version) = pending.engine_version {
680 instance.engine_version = engine_version;
681 }
682 if let Some(master_user_password) = pending.master_user_password {
683 instance.master_user_password = master_user_password;
684 }
685 }
686
687 instance.clone()
688 };
689
690 Ok(AwsResponse::xml(
691 StatusCode::OK,
692 xml_wrap(
693 "RebootDBInstance",
694 &format!(
695 "<DBInstance>{}</DBInstance>",
696 db_instance_xml(&instance, Some("rebooting"))
697 ),
698 &request.request_id,
699 ),
700 ))
701 }
702
703 fn describe_db_engine_versions(
704 &self,
705 request: &AwsRequest,
706 ) -> Result<AwsResponse, AwsServiceError> {
707 let engine = optional_param(request, "Engine");
708 let engine_version = optional_param(request, "EngineVersion");
709 let family = optional_param(request, "DBParameterGroupFamily");
710 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
711
712 let mut versions = filter_engine_versions(
713 &default_engine_versions(),
714 &engine,
715 &engine_version,
716 &family,
717 );
718
719 if default_only.unwrap_or(false) {
720 versions.truncate(1);
721 }
722
723 Ok(AwsResponse::xml(
724 StatusCode::OK,
725 xml_wrap(
726 "DescribeDBEngineVersions",
727 &format!(
728 "<DBEngineVersions>{}</DBEngineVersions>",
729 versions.iter().map(engine_version_xml).collect::<String>()
730 ),
731 &request.request_id,
732 ),
733 ))
734 }
735
736 fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
737 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
738 let marker = optional_param(request, "Marker");
739 let max_records = optional_param(request, "MaxRecords");
740
741 let accounts = self.state.read();
742 let empty = RdsState::new(&request.account_id, &request.region);
743 let state = accounts.get(&request.account_id).unwrap_or(&empty);
744
745 if let Some(identifier) = db_instance_identifier {
747 let instance = state
748 .instances
749 .get(&identifier)
750 .cloned()
751 .ok_or_else(|| db_instance_not_found(&identifier))?;
752
753 return Ok(AwsResponse::xml(
754 StatusCode::OK,
755 xml_wrap(
756 "DescribeDBInstances",
757 &format!(
758 "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
759 db_instance_xml(&instance, None)
760 ),
761 &request.request_id,
762 ),
763 ));
764 }
765
766 let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
768 instances.sort_by(|a, b| {
769 a.created_at
770 .cmp(&b.created_at)
771 .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
772 });
773
774 let paginated = paginate(instances, marker, max_records, |inst| {
776 &inst.db_instance_identifier
777 })?;
778
779 let marker_xml = paginated
780 .next_marker
781 .as_ref()
782 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
783 .unwrap_or_default();
784
785 Ok(AwsResponse::xml(
786 StatusCode::OK,
787 xml_wrap(
788 "DescribeDBInstances",
789 &format!(
790 "<DBInstances>{}</DBInstances>{}",
791 paginated
792 .items
793 .iter()
794 .map(|instance| {
795 format!(
796 "<DBInstance>{}</DBInstance>",
797 db_instance_xml(instance, None)
798 )
799 })
800 .collect::<String>(),
801 marker_xml
802 ),
803 &request.request_id,
804 ),
805 ))
806 }
807
808 fn describe_orderable_db_instance_options(
809 &self,
810 request: &AwsRequest,
811 ) -> Result<AwsResponse, AwsServiceError> {
812 let engine = optional_param(request, "Engine");
813 let engine_version = optional_param(request, "EngineVersion");
814 let db_instance_class = optional_param(request, "DBInstanceClass");
815 let license_model = optional_param(request, "LicenseModel");
816 let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
817
818 let options = filter_orderable_options(
819 &default_orderable_options(),
820 &engine,
821 &engine_version,
822 &db_instance_class,
823 &license_model,
824 vpc,
825 );
826
827 Ok(AwsResponse::xml(
828 StatusCode::OK,
829 xml_wrap(
830 "DescribeOrderableDBInstanceOptions",
831 &format!(
832 "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
833 options.iter().map(orderable_option_xml).collect::<String>()
834 ),
835 &request.request_id,
836 ),
837 ))
838 }
839
840 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
841 let resource_name = required_param(request, "ResourceName")?;
842 let tags = parse_tags(request)?;
843
844 if tags.is_empty() {
845 return Err(AwsServiceError::aws_error(
846 StatusCode::BAD_REQUEST,
847 "MissingParameter",
848 "The request must contain the parameter Tags.",
849 ));
850 }
851
852 let mut accounts = self.state.write();
853 let state = accounts.get_or_create(&request.account_id);
854 let instance = find_instance_by_arn_mut(state, &resource_name)?;
855 merge_tags(&mut instance.tags, &tags);
856
857 Ok(AwsResponse::xml(
858 StatusCode::OK,
859 xml_wrap("AddTagsToResource", "", &request.request_id),
860 ))
861 }
862
863 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
864 let resource_name = required_param(request, "ResourceName")?;
865 if query_param_prefix_exists(request, "Filters.") {
866 return Err(AwsServiceError::aws_error(
867 StatusCode::BAD_REQUEST,
868 "InvalidParameterValue",
869 "Filters are not yet supported for ListTagsForResource.",
870 ));
871 }
872
873 let accounts = self.state.read();
874 let empty = RdsState::new(&request.account_id, &request.region);
875 let state = accounts.get(&request.account_id).unwrap_or(&empty);
876 let instance = find_instance_by_arn(state, &resource_name)?;
877 let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
878
879 Ok(AwsResponse::xml(
880 StatusCode::OK,
881 xml_wrap(
882 "ListTagsForResource",
883 &format!("<TagList>{tag_xml}</TagList>"),
884 &request.request_id,
885 ),
886 ))
887 }
888
889 fn remove_tags_from_resource(
890 &self,
891 request: &AwsRequest,
892 ) -> Result<AwsResponse, AwsServiceError> {
893 let resource_name = required_param(request, "ResourceName")?;
894 let tag_keys = parse_tag_keys(request)?;
895
896 if tag_keys.is_empty() {
897 return Err(AwsServiceError::aws_error(
898 StatusCode::BAD_REQUEST,
899 "MissingParameter",
900 "The request must contain the parameter TagKeys.",
901 ));
902 }
903
904 let mut accounts = self.state.write();
905 let state = accounts.get_or_create(&request.account_id);
906 let instance = find_instance_by_arn_mut(state, &resource_name)?;
907 instance
908 .tags
909 .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
910
911 Ok(AwsResponse::xml(
912 StatusCode::OK,
913 xml_wrap("RemoveTagsFromResource", "", &request.request_id),
914 ))
915 }
916
917 async fn create_db_snapshot(
918 &self,
919 request: &AwsRequest,
920 ) -> Result<AwsResponse, AwsServiceError> {
921 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
922 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
923
924 let runtime = self.runtime.as_ref().ok_or_else(|| {
925 AwsServiceError::aws_error(
926 StatusCode::SERVICE_UNAVAILABLE,
927 "InvalidParameterValue",
928 "Docker/Podman is required for RDS snapshots but is not available",
929 )
930 })?;
931
932 let (instance, db_name) = {
933 let accounts = self.state.read();
934 let empty = RdsState::new(&request.account_id, &request.region);
935 let state = accounts.get(&request.account_id).unwrap_or(&empty);
936
937 if state.snapshots.contains_key(&db_snapshot_identifier) {
938 return Err(AwsServiceError::aws_error(
939 StatusCode::CONFLICT,
940 "DBSnapshotAlreadyExists",
941 format!("DBSnapshot {db_snapshot_identifier} already exists."),
942 ));
943 }
944
945 let instance = state
946 .instances
947 .get(&db_instance_identifier)
948 .cloned()
949 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
950
951 let default_db = default_db_name(&instance.engine);
952 let db_name = instance
953 .db_name
954 .as_deref()
955 .unwrap_or(default_db)
956 .to_string();
957
958 (instance, db_name)
959 };
960
961 let dump_data = runtime
962 .dump_database(
963 &db_instance_identifier,
964 &instance.engine,
965 &instance.master_username,
966 &instance.master_user_password,
967 &db_name,
968 )
969 .await
970 .map_err(runtime_error_to_service_error)?;
971
972 let mut accounts = self.state.write();
973 let state = accounts.get_or_create(&request.account_id);
974
975 if state.snapshots.contains_key(&db_snapshot_identifier) {
976 return Err(AwsServiceError::aws_error(
977 StatusCode::CONFLICT,
978 "DBSnapshotAlreadyExists",
979 format!("DBSnapshot {db_snapshot_identifier} already exists."),
980 ));
981 }
982
983 let snapshot = DbSnapshot {
984 db_snapshot_identifier: db_snapshot_identifier.clone(),
985 db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
986 db_instance_identifier: instance.db_instance_identifier.clone(),
987 snapshot_create_time: Utc::now(),
988 engine: instance.engine.clone(),
989 engine_version: instance.engine_version.clone(),
990 allocated_storage: instance.allocated_storage,
991 status: "available".to_string(),
992 port: instance.port,
993 master_username: instance.master_username.clone(),
994 db_name: instance.db_name.clone(),
995 dbi_resource_id: instance.dbi_resource_id.clone(),
996 snapshot_type: "manual".to_string(),
997 master_user_password: instance.master_user_password.clone(),
998 tags: Vec::new(),
999 dump_data,
1000 };
1001
1002 state
1003 .snapshots
1004 .insert(db_snapshot_identifier, snapshot.clone());
1005
1006 Ok(AwsResponse::xml(
1007 StatusCode::OK,
1008 xml_wrap(
1009 "CreateDBSnapshot",
1010 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1011 &request.request_id,
1012 ),
1013 ))
1014 }
1015
1016 fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1017 let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1018 let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1019 let marker = optional_param(request, "Marker");
1020 let max_records = optional_param(request, "MaxRecords");
1021
1022 if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1023 return Err(AwsServiceError::aws_error(
1024 StatusCode::BAD_REQUEST,
1025 "InvalidParameterCombination",
1026 "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1027 ));
1028 }
1029
1030 let accounts = self.state.read();
1031 let empty = RdsState::new(&request.account_id, &request.region);
1032 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1033
1034 if let Some(snapshot_id) = db_snapshot_identifier {
1036 let snapshot = state
1037 .snapshots
1038 .get(&snapshot_id)
1039 .cloned()
1040 .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1041
1042 return Ok(AwsResponse::xml(
1043 StatusCode::OK,
1044 xml_wrap(
1045 "DescribeDBSnapshots",
1046 &format!(
1047 "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1048 db_snapshot_xml(&snapshot)
1049 ),
1050 &request.request_id,
1051 ),
1052 ));
1053 }
1054
1055 let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1057 state
1058 .snapshots
1059 .values()
1060 .filter(|s| s.db_instance_identifier == instance_id)
1061 .cloned()
1062 .collect()
1063 } else {
1064 state.snapshots.values().cloned().collect()
1065 };
1066
1067 snapshots.sort_by(|a, b| {
1069 a.snapshot_create_time
1070 .cmp(&b.snapshot_create_time)
1071 .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1072 });
1073
1074 let paginated = paginate(snapshots, marker, max_records, |snap| {
1076 &snap.db_snapshot_identifier
1077 })?;
1078
1079 let marker_xml = paginated
1080 .next_marker
1081 .as_ref()
1082 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1083 .unwrap_or_default();
1084
1085 Ok(AwsResponse::xml(
1086 StatusCode::OK,
1087 xml_wrap(
1088 "DescribeDBSnapshots",
1089 &format!(
1090 "<DBSnapshots>{}</DBSnapshots>{}",
1091 paginated
1092 .items
1093 .iter()
1094 .map(|snapshot| format!(
1095 "<DBSnapshot>{}</DBSnapshot>",
1096 db_snapshot_xml(snapshot)
1097 ))
1098 .collect::<String>(),
1099 marker_xml
1100 ),
1101 &request.request_id,
1102 ),
1103 ))
1104 }
1105
1106 fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1108
1109 let mut accounts = self.state.write();
1110 let state = accounts.get_or_create(&request.account_id);
1111
1112 let snapshot = state
1113 .snapshots
1114 .remove(&db_snapshot_identifier)
1115 .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1116
1117 Ok(AwsResponse::xml(
1118 StatusCode::OK,
1119 xml_wrap(
1120 "DeleteDBSnapshot",
1121 &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1122 &request.request_id,
1123 ),
1124 ))
1125 }
1126
1127 async fn restore_db_instance_from_db_snapshot(
1128 &self,
1129 request: &AwsRequest,
1130 ) -> Result<AwsResponse, AwsServiceError> {
1131 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1132 let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1133 let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1134
1135 let runtime = self.require_runtime()?;
1136
1137 let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1138 let mut accounts = self.state.write();
1139 let state = accounts.get_or_create(&request.account_id);
1140
1141 if !state.begin_instance_creation(&db_instance_identifier) {
1142 return Err(AwsServiceError::aws_error(
1143 StatusCode::CONFLICT,
1144 "DBInstanceAlreadyExists",
1145 format!("DBInstance {db_instance_identifier} already exists."),
1146 ));
1147 }
1148
1149 let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1150 Some(s) => s,
1151 None => {
1152 state.cancel_instance_creation(&db_instance_identifier);
1153 return Err(db_snapshot_not_found(&db_snapshot_identifier));
1154 }
1155 };
1156
1157 let dbi_resource_id = state.next_dbi_resource_id();
1158 let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1159 let created_at = Utc::now();
1160
1161 (snapshot, dbi_resource_id, db_instance_arn, created_at)
1162 };
1163
1164 let db_name = snapshot
1165 .db_name
1166 .as_deref()
1167 .unwrap_or(default_db_name(&snapshot.engine));
1168 let running = match runtime
1169 .ensure_postgres(
1170 &db_instance_identifier,
1171 &snapshot.engine,
1172 &snapshot.engine_version,
1173 &snapshot.master_username,
1174 &snapshot.master_user_password,
1175 db_name,
1176 )
1177 .await
1178 {
1179 Ok(running) => running,
1180 Err(e) => {
1181 self.state
1182 .write()
1183 .get_or_create(&request.account_id)
1184 .cancel_instance_creation(&db_instance_identifier);
1185 return Err(runtime_error_to_service_error(e));
1186 }
1187 };
1188
1189 if let Err(e) = runtime
1190 .restore_database(
1191 &db_instance_identifier,
1192 &snapshot.engine,
1193 &snapshot.master_username,
1194 &snapshot.master_user_password,
1195 db_name,
1196 &snapshot.dump_data,
1197 )
1198 .await
1199 {
1200 self.state
1201 .write()
1202 .get_or_create(&request.account_id)
1203 .cancel_instance_creation(&db_instance_identifier);
1204 runtime.stop_container(&db_instance_identifier).await;
1205 return Err(runtime_error_to_service_error(e));
1206 }
1207
1208 let instance = build_restored_instance(
1209 &db_instance_identifier,
1210 db_instance_arn,
1211 dbi_resource_id,
1212 created_at,
1213 vpc_security_group_ids,
1214 &snapshot,
1215 &running,
1216 );
1217
1218 self.state
1219 .write()
1220 .get_or_create(&request.account_id)
1221 .finish_instance_creation(instance.clone());
1222
1223 Ok(AwsResponse::xml(
1224 StatusCode::OK,
1225 xml_wrap(
1226 "RestoreDBInstanceFromDBSnapshot",
1227 &format!(
1228 "<DBInstance>{}</DBInstance>",
1229 db_instance_xml(&instance, None)
1230 ),
1231 &request.request_id,
1232 ),
1233 ))
1234 }
1235
1236 async fn create_db_instance_read_replica(
1237 &self,
1238 request: &AwsRequest,
1239 ) -> Result<AwsResponse, AwsServiceError> {
1240 let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1241 let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1242
1243 let runtime = self.runtime.as_ref().ok_or_else(|| {
1244 AwsServiceError::aws_error(
1245 StatusCode::SERVICE_UNAVAILABLE,
1246 "InvalidParameterValue",
1247 "Docker/Podman is required for RDS read replicas but is not available",
1248 )
1249 })?;
1250
1251 let (source_instance, db_name) = {
1252 let mut accounts = self.state.write();
1253 let state = accounts.get_or_create(&request.account_id);
1254
1255 if !state.begin_instance_creation(&db_instance_identifier) {
1256 return Err(AwsServiceError::aws_error(
1257 StatusCode::CONFLICT,
1258 "DBInstanceAlreadyExists",
1259 format!("DBInstance {db_instance_identifier} already exists."),
1260 ));
1261 }
1262
1263 let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1264 {
1265 Some(inst) => inst,
1266 None => {
1267 state.cancel_instance_creation(&db_instance_identifier);
1268 return Err(db_instance_not_found(&source_db_instance_identifier));
1269 }
1270 };
1271
1272 let default_db = default_db_name(&source_instance.engine);
1273 let db_name = source_instance
1274 .db_name
1275 .as_deref()
1276 .unwrap_or(default_db)
1277 .to_string();
1278
1279 (source_instance, db_name)
1280 };
1281
1282 let dump_data = match runtime
1283 .dump_database(
1284 &source_db_instance_identifier,
1285 &source_instance.engine,
1286 &source_instance.master_username,
1287 &source_instance.master_user_password,
1288 &db_name,
1289 )
1290 .await
1291 {
1292 Ok(data) => data,
1293 Err(e) => {
1294 self.state
1295 .write()
1296 .get_or_create(&request.account_id)
1297 .cancel_instance_creation(&db_instance_identifier);
1298 return Err(runtime_error_to_service_error(e));
1299 }
1300 };
1301
1302 let (dbi_resource_id, db_instance_arn) = {
1303 let accounts = self.state.read();
1304 let empty = RdsState::new(&request.account_id, &request.region);
1305 let s = accounts.get(&request.account_id).unwrap_or(&empty);
1306 (
1307 s.next_dbi_resource_id(),
1308 s.db_instance_arn(&db_instance_identifier),
1309 )
1310 };
1311 let created_at = Utc::now();
1312
1313 let running = match runtime
1314 .ensure_postgres(
1315 &db_instance_identifier,
1316 &source_instance.engine,
1317 &source_instance.engine_version,
1318 &source_instance.master_username,
1319 &source_instance.master_user_password,
1320 &db_name,
1321 )
1322 .await
1323 {
1324 Ok(running) => running,
1325 Err(e) => {
1326 self.state
1327 .write()
1328 .get_or_create(&request.account_id)
1329 .cancel_instance_creation(&db_instance_identifier);
1330 return Err(runtime_error_to_service_error(e));
1331 }
1332 };
1333
1334 if let Err(e) = runtime
1335 .restore_database(
1336 &db_instance_identifier,
1337 &source_instance.engine,
1338 &source_instance.master_username,
1339 &source_instance.master_user_password,
1340 &db_name,
1341 &dump_data,
1342 )
1343 .await
1344 {
1345 self.state
1346 .write()
1347 .get_or_create(&request.account_id)
1348 .cancel_instance_creation(&db_instance_identifier);
1349 runtime.stop_container(&db_instance_identifier).await;
1350 return Err(runtime_error_to_service_error(e));
1351 }
1352
1353 let replica = build_read_replica_instance(
1354 &db_instance_identifier,
1355 db_instance_arn,
1356 dbi_resource_id,
1357 created_at,
1358 &source_db_instance_identifier,
1359 &source_instance,
1360 &running,
1361 );
1362
1363 let source_missing = {
1364 let mut accounts = self.state.write();
1365 let state = accounts.get_or_create(&request.account_id);
1366 match state.instances.get_mut(&source_db_instance_identifier) {
1367 Some(source) => {
1368 source
1369 .read_replica_db_instance_identifiers
1370 .push(db_instance_identifier.clone());
1371 state.finish_instance_creation(replica.clone());
1372 false
1373 }
1374 None => {
1375 state.cancel_instance_creation(&db_instance_identifier);
1376 true
1377 }
1378 }
1379 };
1380
1381 if source_missing {
1382 runtime.stop_container(&db_instance_identifier).await;
1383 return Err(db_instance_not_found(&source_db_instance_identifier));
1384 }
1385
1386 Ok(AwsResponse::xml(
1387 StatusCode::OK,
1388 xml_wrap(
1389 "CreateDBInstanceReadReplica",
1390 &format!(
1391 "<DBInstance>{}</DBInstance>",
1392 db_instance_xml(&replica, None)
1393 ),
1394 &request.request_id,
1395 ),
1396 ))
1397 }
1398
1399 fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1400 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1401 let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1402 let subnet_ids = parse_subnet_ids(request)?;
1403
1404 if subnet_ids.is_empty() {
1405 return Err(AwsServiceError::aws_error(
1406 StatusCode::BAD_REQUEST,
1407 "InvalidParameterValue",
1408 "At least one subnet must be specified.",
1409 ));
1410 }
1411
1412 if subnet_ids.len() < 2 {
1413 return Err(AwsServiceError::aws_error(
1414 StatusCode::BAD_REQUEST,
1415 "DBSubnetGroupDoesNotCoverEnoughAZs",
1416 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1417 ));
1418 }
1419
1420 let mut accounts = self.state.write();
1421 let state = accounts.get_or_create(&request.account_id);
1422
1423 if state.subnet_groups.contains_key(&db_subnet_group_name) {
1424 return Err(AwsServiceError::aws_error(
1425 StatusCode::CONFLICT,
1426 "DBSubnetGroupAlreadyExists",
1427 format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1428 ));
1429 }
1430
1431 let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1432 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1433 .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1434 .collect();
1435
1436 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1438 if unique_azs.len() < 2 {
1439 return Err(AwsServiceError::aws_error(
1440 StatusCode::BAD_REQUEST,
1441 "DBSubnetGroupDoesNotCoverEnoughAZs",
1442 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1443 ));
1444 }
1445
1446 let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1447 let tags = parse_tags(request)?;
1448
1449 let subnet_group = DbSubnetGroup {
1450 db_subnet_group_name: db_subnet_group_name.clone(),
1451 db_subnet_group_arn,
1452 db_subnet_group_description,
1453 vpc_id,
1454 subnet_ids,
1455 subnet_availability_zones,
1456 tags,
1457 };
1458
1459 state
1460 .subnet_groups
1461 .insert(db_subnet_group_name, subnet_group.clone());
1462
1463 Ok(AwsResponse::xml(
1464 StatusCode::OK,
1465 xml_wrap(
1466 "CreateDBSubnetGroup",
1467 &format!(
1468 "<DBSubnetGroup>{}</DBSubnetGroup>",
1469 db_subnet_group_xml(&subnet_group)
1470 ),
1471 &request.request_id,
1472 ),
1473 ))
1474 }
1475
1476 fn describe_db_subnet_groups(
1477 &self,
1478 request: &AwsRequest,
1479 ) -> Result<AwsResponse, AwsServiceError> {
1480 let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1481 let marker = optional_param(request, "Marker");
1482 let max_records = optional_param(request, "MaxRecords");
1483
1484 let accounts = self.state.read();
1485 let empty = RdsState::new(&request.account_id, &request.region);
1486 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1487
1488 if let Some(name) = db_subnet_group_name {
1490 let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1491 AwsServiceError::aws_error(
1492 StatusCode::NOT_FOUND,
1493 "DBSubnetGroupNotFoundFault",
1494 format!("DBSubnetGroup {} not found.", name),
1495 )
1496 })?;
1497
1498 return Ok(AwsResponse::xml(
1499 StatusCode::OK,
1500 xml_wrap(
1501 "DescribeDBSubnetGroups",
1502 &format!(
1503 "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1504 db_subnet_group_xml(sg)
1505 ),
1506 &request.request_id,
1507 ),
1508 ));
1509 }
1510
1511 let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1513 subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1514
1515 let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1517 &sg.db_subnet_group_name
1518 })?;
1519
1520 let marker_xml = paginated
1521 .next_marker
1522 .as_ref()
1523 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1524 .unwrap_or_default();
1525
1526 let body = paginated
1527 .items
1528 .iter()
1529 .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1530 .collect::<Vec<_>>()
1531 .join("");
1532
1533 Ok(AwsResponse::xml(
1534 StatusCode::OK,
1535 xml_wrap(
1536 "DescribeDBSubnetGroups",
1537 &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1538 &request.request_id,
1539 ),
1540 ))
1541 }
1542
1543 fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1544 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1545
1546 let mut accounts = self.state.write();
1547 let state = accounts.get_or_create(&request.account_id);
1548
1549 if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1550 return Err(AwsServiceError::aws_error(
1551 StatusCode::NOT_FOUND,
1552 "DBSubnetGroupNotFoundFault",
1553 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1554 ));
1555 }
1556
1557 Ok(AwsResponse::xml(
1558 StatusCode::OK,
1559 xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1560 ))
1561 }
1562
1563 fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1564 let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1565 let subnet_ids = parse_subnet_ids(request)?;
1566
1567 if subnet_ids.is_empty() {
1568 return Err(AwsServiceError::aws_error(
1569 StatusCode::BAD_REQUEST,
1570 "InvalidParameterValue",
1571 "At least one subnet must be specified.",
1572 ));
1573 }
1574
1575 if subnet_ids.len() < 2 {
1576 return Err(AwsServiceError::aws_error(
1577 StatusCode::BAD_REQUEST,
1578 "DBSubnetGroupDoesNotCoverEnoughAZs",
1579 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1580 ));
1581 }
1582
1583 let mut accounts = self.state.write();
1584 let state = accounts.get_or_create(&request.account_id);
1585
1586 let region = state.region.clone();
1587
1588 let subnet_group = state
1589 .subnet_groups
1590 .get_mut(&db_subnet_group_name)
1591 .ok_or_else(|| {
1592 AwsServiceError::aws_error(
1593 StatusCode::NOT_FOUND,
1594 "DBSubnetGroupNotFoundFault",
1595 format!("DBSubnetGroup {db_subnet_group_name} not found."),
1596 )
1597 })?;
1598
1599 let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1600 .map(|i| format!("{}{}", ®ion, char::from(b'a' + (i % 6) as u8)))
1601 .collect();
1602
1603 let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1605 if unique_azs.len() < 2 {
1606 return Err(AwsServiceError::aws_error(
1607 StatusCode::BAD_REQUEST,
1608 "DBSubnetGroupDoesNotCoverEnoughAZs",
1609 "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1610 ));
1611 }
1612
1613 subnet_group.subnet_ids = subnet_ids;
1614 subnet_group.subnet_availability_zones = subnet_availability_zones;
1615
1616 let subnet_group_clone = subnet_group.clone();
1617
1618 Ok(AwsResponse::xml(
1619 StatusCode::OK,
1620 xml_wrap(
1621 "ModifyDBSubnetGroup",
1622 &format!(
1623 "<DBSubnetGroup>{}</DBSubnetGroup>",
1624 db_subnet_group_xml(&subnet_group_clone)
1625 ),
1626 &request.request_id,
1627 ),
1628 ))
1629 }
1630
1631 fn create_db_parameter_group(
1632 &self,
1633 request: &AwsRequest,
1634 ) -> Result<AwsResponse, AwsServiceError> {
1635 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1636 let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1637 let description = required_param(request, "Description")?;
1638
1639 let valid_families = [
1641 "postgres16",
1642 "postgres15",
1643 "postgres14",
1644 "postgres13",
1645 "mysql8.0",
1646 "mysql5.7",
1647 "mariadb10.11",
1648 "mariadb10.6",
1649 ];
1650
1651 if !valid_families.contains(&db_parameter_group_family.as_str()) {
1652 return Err(AwsServiceError::aws_error(
1653 StatusCode::BAD_REQUEST,
1654 "InvalidParameterValue",
1655 format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1656 ));
1657 }
1658
1659 let mut accounts = self.state.write();
1660 let state = accounts.get_or_create(&request.account_id);
1661
1662 if state
1663 .parameter_groups
1664 .contains_key(&db_parameter_group_name)
1665 {
1666 return Err(AwsServiceError::aws_error(
1667 StatusCode::CONFLICT,
1668 "DBParameterGroupAlreadyExists",
1669 format!("DBParameterGroup {db_parameter_group_name} already exists."),
1670 ));
1671 }
1672
1673 let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1674 let tags = parse_tags(request)?;
1675
1676 let parameter_group = DbParameterGroup {
1677 db_parameter_group_name: db_parameter_group_name.clone(),
1678 db_parameter_group_arn,
1679 db_parameter_group_family,
1680 description,
1681 parameters: std::collections::HashMap::new(),
1682 tags,
1683 };
1684
1685 state
1686 .parameter_groups
1687 .insert(db_parameter_group_name, parameter_group.clone());
1688
1689 Ok(AwsResponse::xml(
1690 StatusCode::OK,
1691 xml_wrap(
1692 "CreateDBParameterGroup",
1693 &format!(
1694 "<DBParameterGroup>{}</DBParameterGroup>",
1695 db_parameter_group_xml(¶meter_group)
1696 ),
1697 &request.request_id,
1698 ),
1699 ))
1700 }
1701
1702 fn describe_db_parameter_groups(
1703 &self,
1704 request: &AwsRequest,
1705 ) -> Result<AwsResponse, AwsServiceError> {
1706 let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
1707 let marker = optional_param(request, "Marker");
1708 let max_records = optional_param(request, "MaxRecords");
1709
1710 let accounts = self.state.read();
1711 let empty = RdsState::new(&request.account_id, &request.region);
1712 let state = accounts.get(&request.account_id).unwrap_or(&empty);
1713
1714 if let Some(name) = db_parameter_group_name {
1716 let pg = state.parameter_groups.get(&name).ok_or_else(|| {
1717 AwsServiceError::aws_error(
1718 StatusCode::NOT_FOUND,
1719 "DBParameterGroupNotFound",
1720 format!("DBParameterGroup {} not found.", name),
1721 )
1722 })?;
1723
1724 return Ok(AwsResponse::xml(
1725 StatusCode::OK,
1726 xml_wrap(
1727 "DescribeDBParameterGroups",
1728 &format!(
1729 "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
1730 db_parameter_group_xml(pg)
1731 ),
1732 &request.request_id,
1733 ),
1734 ));
1735 }
1736
1737 let mut parameter_groups: Vec<DbParameterGroup> =
1739 state.parameter_groups.values().cloned().collect();
1740 parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
1741
1742 let paginated = paginate(parameter_groups, marker, max_records, |pg| {
1744 &pg.db_parameter_group_name
1745 })?;
1746
1747 let marker_xml = paginated
1748 .next_marker
1749 .as_ref()
1750 .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1751 .unwrap_or_default();
1752
1753 let body = paginated
1754 .items
1755 .iter()
1756 .map(|pg| {
1757 format!(
1758 "<DBParameterGroup>{}</DBParameterGroup>",
1759 db_parameter_group_xml(pg)
1760 )
1761 })
1762 .collect::<Vec<_>>()
1763 .join("");
1764
1765 Ok(AwsResponse::xml(
1766 StatusCode::OK,
1767 xml_wrap(
1768 "DescribeDBParameterGroups",
1769 &format!(
1770 "<DBParameterGroups>{}</DBParameterGroups>{}",
1771 body, marker_xml
1772 ),
1773 &request.request_id,
1774 ),
1775 ))
1776 }
1777
1778 fn delete_db_parameter_group(
1779 &self,
1780 request: &AwsRequest,
1781 ) -> Result<AwsResponse, AwsServiceError> {
1782 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1783
1784 let mut accounts = self.state.write();
1785 let state = accounts.get_or_create(&request.account_id);
1786
1787 if db_parameter_group_name.starts_with("default.") {
1788 return Err(AwsServiceError::aws_error(
1789 StatusCode::BAD_REQUEST,
1790 "InvalidParameterValue",
1791 "Cannot delete default parameter groups.",
1792 ));
1793 }
1794
1795 if state
1796 .parameter_groups
1797 .remove(&db_parameter_group_name)
1798 .is_none()
1799 {
1800 return Err(AwsServiceError::aws_error(
1801 StatusCode::NOT_FOUND,
1802 "DBParameterGroupNotFound",
1803 format!("DBParameterGroup {db_parameter_group_name} not found."),
1804 ));
1805 }
1806
1807 Ok(AwsResponse::xml(
1808 StatusCode::OK,
1809 xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
1810 ))
1811 }
1812
1813 fn modify_db_parameter_group(
1814 &self,
1815 request: &AwsRequest,
1816 ) -> Result<AwsResponse, AwsServiceError> {
1817 let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1818
1819 let mut accounts = self.state.write();
1820 let state = accounts.get_or_create(&request.account_id);
1821
1822 let parameter_group = state
1823 .parameter_groups
1824 .get_mut(&db_parameter_group_name)
1825 .ok_or_else(|| {
1826 AwsServiceError::aws_error(
1827 StatusCode::NOT_FOUND,
1828 "DBParameterGroupNotFound",
1829 format!("DBParameterGroup {db_parameter_group_name} not found."),
1830 )
1831 })?;
1832
1833 if let Some(new_description) = optional_param(request, "Description") {
1834 parameter_group.description = new_description;
1835 }
1836
1837 let parameter_group_clone = parameter_group.clone();
1838
1839 Ok(AwsResponse::xml(
1840 StatusCode::OK,
1841 xml_wrap(
1842 "ModifyDBParameterGroup",
1843 &format!(
1844 "<DBParameterGroupName>{}</DBParameterGroupName>",
1845 xml_escape(¶meter_group_clone.db_parameter_group_name)
1846 ),
1847 &request.request_id,
1848 ),
1849 ))
1850 }
1851}
1852
1853fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
1854 fakecloud_core::query::optional_query_param(req, name)
1855}
1856
1857fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
1858 fakecloud_core::query::required_query_param(req, name)
1859}
1860
1861fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
1862 let value = required_param(req, name)?;
1863 value.parse::<i32>().map_err(|_| {
1864 AwsServiceError::aws_error(
1865 StatusCode::BAD_REQUEST,
1866 "InvalidParameterValue",
1867 format!("Parameter {name} must be a valid integer."),
1868 )
1869 })
1870}
1871
1872fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
1873 optional_param(req, name)
1874 .map(|value| {
1875 value.parse::<i32>().map_err(|_| {
1876 AwsServiceError::aws_error(
1877 StatusCode::BAD_REQUEST,
1878 "InvalidParameterValue",
1879 format!("Parameter {name} must be a valid integer."),
1880 )
1881 })
1882 })
1883 .transpose()
1884}
1885
1886fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
1887 let mut tags = Vec::new();
1888 for index in 1.. {
1889 let key_name = format!("Tags.Tag.{index}.Key");
1890 let value_name = format!("Tags.Tag.{index}.Value");
1891 let key = optional_param(req, &key_name);
1892 let value = optional_param(req, &value_name);
1893
1894 match (key, value) {
1895 (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
1896 (None, None) => break,
1897 _ => {
1898 return Err(AwsServiceError::aws_error(
1899 StatusCode::BAD_REQUEST,
1900 "InvalidParameterValue",
1901 "Each tag must include both Key and Value.",
1902 ));
1903 }
1904 }
1905 }
1906
1907 Ok(tags)
1908}
1909
1910fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1911 let mut keys = Vec::new();
1912 for index in 1.. {
1913 let key_name = format!("TagKeys.member.{index}");
1914 match optional_param(req, &key_name) {
1915 Some(key) => keys.push(key),
1916 None => break,
1917 }
1918 }
1919
1920 Ok(keys)
1921}
1922
1923fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
1924 let mut subnet_ids = Vec::new();
1925 for index in 1.. {
1926 let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
1927 match optional_param(req, &subnet_id_name) {
1928 Some(subnet_id) => subnet_ids.push(subnet_id),
1929 None => break,
1930 }
1931 }
1932
1933 Ok(subnet_ids)
1934}
1935
1936fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
1937 let mut security_group_ids = Vec::new();
1938 for index in 1.. {
1939 let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
1940 match optional_param(req, &sg_id_name) {
1941 Some(sg_id) => security_group_ids.push(sg_id),
1942 None => break,
1943 }
1944 }
1945
1946 if security_group_ids.is_empty() {
1948 security_group_ids.push("sg-default".to_string());
1949 }
1950
1951 security_group_ids
1952}
1953
1954fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
1955 req.query_params.keys().any(|key| key.starts_with(prefix))
1956}
1957
1958fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
1959 value
1960 .map(|raw| match raw {
1961 "true" | "True" | "TRUE" => Ok(true),
1962 "false" | "False" | "FALSE" => Ok(false),
1963 _ => Err(AwsServiceError::aws_error(
1964 StatusCode::BAD_REQUEST,
1965 "InvalidParameterValue",
1966 format!("Boolean parameter value '{raw}' is invalid."),
1967 )),
1968 })
1969 .transpose()
1970}
1971
1972struct PaginationResult<T> {
1973 items: Vec<T>,
1974 next_marker: Option<String>,
1975}
1976
1977fn paginate<T, F>(
1978 mut items: Vec<T>,
1979 marker: Option<String>,
1980 max_records: Option<String>,
1981 get_id: F,
1982) -> Result<PaginationResult<T>, AwsServiceError>
1983where
1984 F: Fn(&T) -> &str,
1985{
1986 let max = if let Some(max_str) = max_records {
1988 let parsed = max_str.parse::<i32>().map_err(|_| {
1989 AwsServiceError::aws_error(
1990 StatusCode::BAD_REQUEST,
1991 "InvalidParameterValue",
1992 "MaxRecords must be a valid integer.",
1993 )
1994 })?;
1995 if !(1..=100).contains(&parsed) {
1996 return Err(AwsServiceError::aws_error(
1997 StatusCode::BAD_REQUEST,
1998 "InvalidParameterValue",
1999 "MaxRecords must be between 1 and 100.",
2000 ));
2001 }
2002 parsed as usize
2003 } else {
2004 100
2005 };
2006
2007 let start_id = if let Some(encoded_marker) = marker {
2009 let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2010 AwsServiceError::aws_error(
2011 StatusCode::BAD_REQUEST,
2012 "InvalidParameterValue",
2013 "Marker is invalid.",
2014 )
2015 })?;
2016 let id = String::from_utf8(decoded).map_err(|_| {
2017 AwsServiceError::aws_error(
2018 StatusCode::BAD_REQUEST,
2019 "InvalidParameterValue",
2020 "Marker is invalid.",
2021 )
2022 })?;
2023 Some(id)
2024 } else {
2025 None
2026 };
2027
2028 let start_index = if let Some(ref start_id) = start_id {
2030 items
2031 .iter()
2032 .position(|item| get_id(item) == start_id)
2033 .map(|pos| pos + 1) .unwrap_or(items.len()) } else {
2036 0
2037 };
2038
2039 let total_items = items.len();
2041 let end_index = std::cmp::min(start_index + max, total_items);
2042 let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2043
2044 let next_marker = if end_index < total_items {
2046 paginated_items
2047 .last()
2048 .map(|item| BASE64.encode(get_id(item).as_bytes()))
2049 } else {
2050 None
2051 };
2052
2053 Ok(PaginationResult {
2054 items: paginated_items,
2055 next_marker,
2056 })
2057}
2058
2059fn validate_create_request(
2060 db_instance_identifier: &str,
2061 allocated_storage: i32,
2062 db_instance_class: &str,
2063 engine: &str,
2064 engine_version: &str,
2065 port: i32,
2066) -> Result<(), AwsServiceError> {
2067 if allocated_storage <= 0 {
2068 return Err(AwsServiceError::aws_error(
2069 StatusCode::BAD_REQUEST,
2070 "InvalidParameterValue",
2071 "AllocatedStorage must be greater than zero.",
2072 ));
2073 }
2074 if port <= 0 {
2075 return Err(AwsServiceError::aws_error(
2076 StatusCode::BAD_REQUEST,
2077 "InvalidParameterValue",
2078 "Port must be greater than zero.",
2079 ));
2080 }
2081 if !db_instance_identifier
2082 .chars()
2083 .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2084 {
2085 return Err(AwsServiceError::aws_error(
2086 StatusCode::BAD_REQUEST,
2087 "InvalidParameterValue",
2088 "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2089 ));
2090 }
2091 let supported_engines = ["postgres", "mysql", "mariadb"];
2093 if !supported_engines.contains(&engine) {
2094 return Err(AwsServiceError::aws_error(
2095 StatusCode::BAD_REQUEST,
2096 "InvalidParameterValue",
2097 format!("Engine '{}' is not supported.", engine),
2098 ));
2099 }
2100
2101 let supported_versions = match engine {
2103 "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2104 "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2105 "mariadb" => vec!["10.11.6", "10.6.16"],
2106 _ => vec![],
2107 };
2108
2109 if !supported_versions.contains(&engine_version) {
2110 return Err(AwsServiceError::aws_error(
2111 StatusCode::BAD_REQUEST,
2112 "InvalidParameterValue",
2113 format!("EngineVersion '{engine_version}' is not supported yet."),
2114 ));
2115 }
2116 validate_db_instance_class(db_instance_class)?;
2117 Ok(())
2118}
2119
2120fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2121 if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2122 return Err(AwsServiceError::aws_error(
2123 StatusCode::BAD_REQUEST,
2124 "InvalidParameterValue",
2125 format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2126 ));
2127 }
2128 Ok(())
2129}
2130
2131fn filter_engine_versions(
2132 versions: &[EngineVersionInfo],
2133 engine: &Option<String>,
2134 engine_version: &Option<String>,
2135 family: &Option<String>,
2136) -> Vec<EngineVersionInfo> {
2137 versions
2138 .iter()
2139 .filter(|candidate| {
2140 engine
2141 .as_ref()
2142 .is_none_or(|expected| candidate.engine == *expected)
2143 })
2144 .filter(|candidate| {
2145 engine_version
2146 .as_ref()
2147 .is_none_or(|expected| candidate.engine_version == *expected)
2148 })
2149 .filter(|candidate| {
2150 family
2151 .as_ref()
2152 .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2153 })
2154 .cloned()
2155 .collect()
2156}
2157
2158fn filter_orderable_options(
2159 options: &[OrderableDbInstanceOption],
2160 engine: &Option<String>,
2161 engine_version: &Option<String>,
2162 db_instance_class: &Option<String>,
2163 license_model: &Option<String>,
2164 vpc: Option<bool>,
2165) -> Vec<OrderableDbInstanceOption> {
2166 options
2167 .iter()
2168 .filter(|candidate| {
2169 engine
2170 .as_ref()
2171 .is_none_or(|expected| candidate.engine == *expected)
2172 })
2173 .filter(|candidate| {
2174 engine_version
2175 .as_ref()
2176 .is_none_or(|expected| candidate.engine_version == *expected)
2177 })
2178 .filter(|candidate| {
2179 db_instance_class
2180 .as_ref()
2181 .is_none_or(|expected| candidate.db_instance_class == *expected)
2182 })
2183 .filter(|candidate| {
2184 license_model
2185 .as_ref()
2186 .is_none_or(|expected| candidate.license_model == *expected)
2187 })
2188 .filter(|_| vpc.unwrap_or(true))
2189 .cloned()
2190 .collect()
2191}
2192
2193#[allow(clippy::too_many_arguments)]
2197fn build_restored_instance(
2201 db_instance_identifier: &str,
2202 db_instance_arn: String,
2203 dbi_resource_id: String,
2204 created_at: chrono::DateTime<Utc>,
2205 vpc_security_group_ids: Vec<String>,
2206 snapshot: &DbSnapshot,
2207 running: &crate::runtime::RunningDbContainer,
2208) -> DbInstance {
2209 DbInstance {
2210 db_instance_identifier: db_instance_identifier.to_string(),
2211 db_instance_arn,
2212 db_instance_class: "db.t3.micro".to_string(),
2213 engine: snapshot.engine.clone(),
2214 engine_version: snapshot.engine_version.clone(),
2215 db_instance_status: "available".to_string(),
2216 master_username: snapshot.master_username.clone(),
2217 db_name: snapshot.db_name.clone(),
2218 endpoint_address: "127.0.0.1".to_string(),
2219 port: i32::from(running.host_port),
2220 allocated_storage: snapshot.allocated_storage,
2221 publicly_accessible: true,
2222 deletion_protection: false,
2223 created_at,
2224 dbi_resource_id,
2225 master_user_password: snapshot.master_user_password.clone(),
2226 container_id: running.container_id.clone(),
2227 host_port: running.host_port,
2228 tags: Vec::new(),
2229 read_replica_source_db_instance_identifier: None,
2230 read_replica_db_instance_identifiers: Vec::new(),
2231 vpc_security_group_ids,
2232 db_parameter_group_name: None,
2233 backup_retention_period: 1,
2234 preferred_backup_window: "03:00-04:00".to_string(),
2235 latest_restorable_time: Some(created_at),
2236 option_group_name: None,
2237 multi_az: false,
2238 pending_modified_values: None,
2239 }
2240}
2241
2242fn build_read_replica_instance(
2243 db_instance_identifier: &str,
2244 db_instance_arn: String,
2245 dbi_resource_id: String,
2246 created_at: chrono::DateTime<Utc>,
2247 source_db_instance_identifier: &str,
2248 source: &DbInstance,
2249 running: &crate::runtime::RunningDbContainer,
2250) -> DbInstance {
2251 DbInstance {
2252 db_instance_identifier: db_instance_identifier.to_string(),
2253 db_instance_arn,
2254 db_instance_class: source.db_instance_class.clone(),
2255 engine: source.engine.clone(),
2256 engine_version: source.engine_version.clone(),
2257 db_instance_status: "available".to_string(),
2258 master_username: source.master_username.clone(),
2259 db_name: source.db_name.clone(),
2260 endpoint_address: "127.0.0.1".to_string(),
2261 port: i32::from(running.host_port),
2262 allocated_storage: source.allocated_storage,
2263 publicly_accessible: source.publicly_accessible,
2264 deletion_protection: false,
2265 created_at,
2266 dbi_resource_id,
2267 master_user_password: source.master_user_password.clone(),
2268 container_id: running.container_id.clone(),
2269 host_port: running.host_port,
2270 tags: Vec::new(),
2271 read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2272 read_replica_db_instance_identifiers: Vec::new(),
2273 vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2274 db_parameter_group_name: source.db_parameter_group_name.clone(),
2275 backup_retention_period: source.backup_retention_period,
2276 preferred_backup_window: source.preferred_backup_window.clone(),
2277 latest_restorable_time: if source.backup_retention_period > 0 {
2278 Some(created_at)
2279 } else {
2280 None
2281 },
2282 option_group_name: source.option_group_name.clone(),
2283 multi_az: source.multi_az,
2284 pending_modified_values: None,
2285 }
2286}
2287
2288fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2289 fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2290}
2291
2292fn engine_version_xml(version: &EngineVersionInfo) -> String {
2293 format!(
2294 "<DBEngineVersion>\
2295 <Engine>{}</Engine>\
2296 <EngineVersion>{}</EngineVersion>\
2297 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2298 <DBEngineDescription>{}</DBEngineDescription>\
2299 <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2300 <Status>{}</Status>\
2301 </DBEngineVersion>",
2302 xml_escape(&version.engine),
2303 xml_escape(&version.engine_version),
2304 xml_escape(&version.db_parameter_group_family),
2305 xml_escape(&version.db_engine_description),
2306 xml_escape(&version.db_engine_version_description),
2307 xml_escape(&version.status),
2308 )
2309}
2310
2311fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2312 format!(
2313 "<OrderableDBInstanceOption>\
2314 <Engine>{}</Engine>\
2315 <EngineVersion>{}</EngineVersion>\
2316 <DBInstanceClass>{}</DBInstanceClass>\
2317 <LicenseModel>{}</LicenseModel>\
2318 <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2319 <MultiAZCapable>true</MultiAZCapable>\
2320 <ReadReplicaCapable>true</ReadReplicaCapable>\
2321 <Vpc>true</Vpc>\
2322 <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2323 <StorageType>{}</StorageType>\
2324 <SupportsIops>false</SupportsIops>\
2325 <MinStorageSize>{}</MinStorageSize>\
2326 <MaxStorageSize>{}</MaxStorageSize>\
2327 <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2328 </OrderableDBInstanceOption>",
2329 xml_escape(&option.engine),
2330 xml_escape(&option.engine_version),
2331 xml_escape(&option.db_instance_class),
2332 xml_escape(&option.license_model),
2333 xml_escape(&option.storage_type),
2334 option.min_storage_size,
2335 option.max_storage_size,
2336 )
2337}
2338
2339fn tag_xml(tag: &RdsTag) -> String {
2340 format!(
2341 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2342 xml_escape(&tag.key),
2343 xml_escape(&tag.value),
2344 )
2345}
2346
2347fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2348 let status = status_override.unwrap_or(&instance.db_instance_status);
2349 let db_name_xml = instance
2350 .db_name
2351 .as_ref()
2352 .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2353 .unwrap_or_default();
2354
2355 let read_replica_source_xml = instance
2356 .read_replica_source_db_instance_identifier
2357 .as_ref()
2358 .map(|source| {
2359 format!(
2360 "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2361 xml_escape(source)
2362 )
2363 })
2364 .unwrap_or_default();
2365
2366 let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2367 "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2368 } else {
2369 format!(
2370 "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2371 instance
2372 .read_replica_db_instance_identifiers
2373 .iter()
2374 .map(|id| format!(
2375 "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2376 xml_escape(id)
2377 ))
2378 .collect::<String>()
2379 )
2380 };
2381
2382 let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2383 "<VpcSecurityGroups/>".to_string()
2384 } else {
2385 format!(
2386 "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2387 instance
2388 .vpc_security_group_ids
2389 .iter()
2390 .map(|sg_id| format!(
2391 "<VpcSecurityGroupMembership>\
2392 <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2393 <Status>active</Status>\
2394 </VpcSecurityGroupMembership>",
2395 xml_escape(sg_id)
2396 ))
2397 .collect::<String>()
2398 )
2399 };
2400
2401 let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2402 Some(pg_name) => format!(
2403 "<DBParameterGroups>\
2404 <DBParameterGroup>\
2405 <DBParameterGroupName>{}</DBParameterGroupName>\
2406 <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2407 </DBParameterGroup>\
2408 </DBParameterGroups>",
2409 xml_escape(pg_name)
2410 ),
2411 None => "<DBParameterGroups/>".to_string(),
2412 };
2413
2414 let option_group_memberships_xml = match &instance.option_group_name {
2415 Some(og_name) => format!(
2416 "<OptionGroupMemberships>\
2417 <OptionGroupMembership>\
2418 <OptionGroupName>{}</OptionGroupName>\
2419 <Status>in-sync</Status>\
2420 </OptionGroupMembership>\
2421 </OptionGroupMemberships>",
2422 xml_escape(og_name)
2423 ),
2424 None => "<OptionGroupMemberships/>".to_string(),
2425 };
2426
2427 let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2428 let mut fields = Vec::new();
2429 if let Some(ref class) = pending.db_instance_class {
2430 fields.push(format!(
2431 "<DBInstanceClass>{}</DBInstanceClass>",
2432 xml_escape(class)
2433 ));
2434 }
2435 if let Some(allocated_storage) = pending.allocated_storage {
2436 fields.push(format!(
2437 "<AllocatedStorage>{}</AllocatedStorage>",
2438 allocated_storage
2439 ));
2440 }
2441 if let Some(backup_retention_period) = pending.backup_retention_period {
2442 fields.push(format!(
2443 "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2444 backup_retention_period
2445 ));
2446 }
2447 if let Some(multi_az) = pending.multi_az {
2448 fields.push(format!(
2449 "<MultiAZ>{}</MultiAZ>",
2450 if multi_az { "true" } else { "false" }
2451 ));
2452 }
2453 if let Some(ref engine_version) = pending.engine_version {
2454 fields.push(format!(
2455 "<EngineVersion>{}</EngineVersion>",
2456 xml_escape(engine_version)
2457 ));
2458 }
2459 if pending.master_user_password.is_some() {
2460 fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2461 }
2462 if !fields.is_empty() {
2463 format!(
2464 "<PendingModifiedValues>{}</PendingModifiedValues>",
2465 fields.join("")
2466 )
2467 } else {
2468 String::new()
2469 }
2470 } else {
2471 String::new()
2472 };
2473
2474 let latest_restorable_time_xml = instance
2475 .latest_restorable_time
2476 .map(|t| {
2477 format!(
2478 "<LatestRestorableTime>{}</LatestRestorableTime>",
2479 t.to_rfc3339()
2480 )
2481 })
2482 .unwrap_or_default();
2483
2484 format!(
2485 "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2486 <DBInstanceClass>{class}</DBInstanceClass>\
2487 <Engine>{engine}</Engine>\
2488 <DBInstanceStatus>{status}</DBInstanceStatus>\
2489 <MasterUsername>{master_username}</MasterUsername>\
2490 {db_name_xml}\
2491 <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2492 <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2493 <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2494 <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2495 <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2496 <DBSecurityGroups/>\
2497 {vpc_security_groups_xml}\
2498 {db_parameter_groups_xml}\
2499 <AvailabilityZone>us-east-1a</AvailabilityZone>\
2500 {latest_restorable_time_xml}\
2501 <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2502 <MultiAZ>{multi_az}</MultiAZ>\
2503 <EngineVersion>{engine_version}</EngineVersion>\
2504 <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2505 {read_replica_identifiers_xml}\
2506 {read_replica_source_xml}\
2507 <LicenseModel>{license_model}</LicenseModel>\
2508 {option_group_memberships_xml}\
2509 <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2510 <StorageType>gp2</StorageType>\
2511 <DbInstancePort>{port}</DbInstancePort>\
2512 <StorageEncrypted>false</StorageEncrypted>\
2513 <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2514 <DeletionProtection>{deletion_protection}</DeletionProtection>\
2515 {pending_modified_values_xml}\
2516 <DBInstanceArn>{arn}</DBInstanceArn>",
2517 identifier = xml_escape(&instance.db_instance_identifier),
2518 class = xml_escape(&instance.db_instance_class),
2519 engine = xml_escape(&instance.engine),
2520 status = xml_escape(status),
2521 master_username = xml_escape(&instance.master_username),
2522 endpoint_address = xml_escape(&instance.endpoint_address),
2523 port = instance.port,
2524 allocated_storage = instance.allocated_storage,
2525 create_time = instance.created_at.to_rfc3339(),
2526 preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2527 backup_retention_period = instance.backup_retention_period,
2528 multi_az = if instance.multi_az { "true" } else { "false" },
2529 engine_version = xml_escape(&instance.engine_version),
2530 license_model = license_model_for_engine(&instance.engine),
2531 publicly_accessible = if instance.publicly_accessible {
2532 "true"
2533 } else {
2534 "false"
2535 },
2536 dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2537 deletion_protection = if instance.deletion_protection {
2538 "true"
2539 } else {
2540 "false"
2541 },
2542 arn = xml_escape(&instance.db_instance_arn),
2543 )
2544}
2545
2546fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2547 format!(
2548 "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2549 <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2550 <SnapshotCreateTime>{}</SnapshotCreateTime>\
2551 <Engine>{}</Engine>\
2552 <EngineVersion>{}</EngineVersion>\
2553 <AllocatedStorage>{}</AllocatedStorage>\
2554 <Status>{}</Status>\
2555 <Port>{}</Port>\
2556 <MasterUsername>{}</MasterUsername>\
2557 {}\
2558 <DbiResourceId>{}</DbiResourceId>\
2559 <SnapshotType>{}</SnapshotType>\
2560 <DBSnapshotArn>{}</DBSnapshotArn>",
2561 xml_escape(&snapshot.db_snapshot_identifier),
2562 xml_escape(&snapshot.db_instance_identifier),
2563 snapshot.snapshot_create_time.to_rfc3339(),
2564 xml_escape(&snapshot.engine),
2565 xml_escape(&snapshot.engine_version),
2566 snapshot.allocated_storage,
2567 xml_escape(&snapshot.status),
2568 snapshot.port,
2569 xml_escape(&snapshot.master_username),
2570 snapshot
2571 .db_name
2572 .as_ref()
2573 .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2574 .unwrap_or_default(),
2575 xml_escape(&snapshot.dbi_resource_id),
2576 xml_escape(&snapshot.snapshot_type),
2577 xml_escape(&snapshot.db_snapshot_arn),
2578 )
2579}
2580
2581fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2582 let subnets_xml = subnet_group
2583 .subnet_ids
2584 .iter()
2585 .zip(&subnet_group.subnet_availability_zones)
2586 .map(|(subnet_id, az)| {
2587 format!(
2588 "<Subnet>\
2589 <SubnetIdentifier>{}</SubnetIdentifier>\
2590 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2591 <SubnetStatus>Active</SubnetStatus>\
2592 </Subnet>",
2593 xml_escape(subnet_id),
2594 xml_escape(az)
2595 )
2596 })
2597 .collect::<String>();
2598
2599 format!(
2600 "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2601 <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2602 <VpcId>{}</VpcId>\
2603 <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2604 <Subnets>{}</Subnets>\
2605 <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2606 xml_escape(&subnet_group.db_subnet_group_name),
2607 xml_escape(&subnet_group.db_subnet_group_description),
2608 xml_escape(&subnet_group.vpc_id),
2609 subnets_xml,
2610 xml_escape(&subnet_group.db_subnet_group_arn),
2611 )
2612}
2613
2614fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2615 format!(
2616 "<DBParameterGroupName>{}</DBParameterGroupName>\
2617 <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2618 <Description>{}</Description>\
2619 <DBParameterGroupArn>{}</DBParameterGroupArn>",
2620 xml_escape(¶meter_group.db_parameter_group_name),
2621 xml_escape(¶meter_group.db_parameter_group_family),
2622 xml_escape(¶meter_group.description),
2623 xml_escape(¶meter_group.db_parameter_group_arn),
2624 )
2625}
2626
2627fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2628 AwsServiceError::aws_error(
2629 StatusCode::NOT_FOUND,
2630 "DBInstanceNotFound",
2631 format!("DBInstance {} not found.", identifier),
2632 )
2633}
2634
2635fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2636 AwsServiceError::aws_error(
2637 StatusCode::NOT_FOUND,
2638 "DBSnapshotNotFound",
2639 format!("DBSnapshot {} not found.", identifier),
2640 )
2641}
2642
2643fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2644 AwsServiceError::aws_error(
2645 StatusCode::NOT_FOUND,
2646 "DBInstanceNotFound",
2647 format!("DBInstance {resource_name} not found."),
2648 )
2649}
2650
2651fn find_instance_by_arn<'a>(
2652 state: &'a crate::state::RdsState,
2653 resource_name: &str,
2654) -> Result<&'a DbInstance, AwsServiceError> {
2655 state
2656 .instances
2657 .values()
2658 .find(|instance| instance.db_instance_arn == resource_name)
2659 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2660}
2661
2662fn find_instance_by_arn_mut<'a>(
2663 state: &'a mut crate::state::RdsState,
2664 resource_name: &str,
2665) -> Result<&'a mut DbInstance, AwsServiceError> {
2666 state
2667 .instances
2668 .values_mut()
2669 .find(|instance| instance.db_instance_arn == resource_name)
2670 .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
2671}
2672
2673fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
2674 for tag in incoming {
2675 if let Some(existing_tag) = existing
2676 .iter_mut()
2677 .find(|candidate| candidate.key == tag.key)
2678 {
2679 existing_tag.value = tag.value.clone();
2680 } else {
2681 existing.push(tag.clone());
2682 }
2683 }
2684}
2685
2686fn license_model_for_engine(engine: &str) -> &'static str {
2687 match engine {
2688 "mysql" | "mariadb" => "general-public-license",
2689 _ => "postgresql-license",
2690 }
2691}
2692
2693fn default_db_name(engine: &str) -> &'static str {
2694 match engine {
2695 "mysql" | "mariadb" => "mysql",
2696 _ => "postgres",
2697 }
2698}
2699
2700fn default_port_for_engine(engine: &str) -> i32 {
2703 match engine {
2704 "postgres" => 5432,
2705 "mysql" | "mariadb" => 3306,
2706 _ => 5432,
2707 }
2708}
2709
2710fn default_parameter_group(engine: &str, engine_version: &str) -> String {
2715 match engine {
2716 "postgres" => {
2717 let major = engine_version.split('.').next().unwrap_or("16");
2718 format!("default.postgres{}", major)
2719 }
2720 "mysql" => {
2721 let major = if engine_version.starts_with("5.7") {
2722 "5.7"
2723 } else {
2724 "8.0"
2725 };
2726 format!("default.mysql{}", major)
2727 }
2728 "mariadb" => {
2729 let major = if engine_version.starts_with("10.11") {
2730 "10.11"
2731 } else {
2732 "10.6"
2733 };
2734 format!("default.mariadb{}", major)
2735 }
2736 _ => "default.postgres16".to_string(),
2737 }
2738}
2739
2740fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
2741 match error {
2742 RuntimeError::Unavailable => AwsServiceError::aws_error(
2743 StatusCode::SERVICE_UNAVAILABLE,
2744 "InvalidParameterValue",
2745 "Docker/Podman is required for RDS DB instances but is not available",
2746 ),
2747 RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
2748 StatusCode::INTERNAL_SERVER_ERROR,
2749 "InternalFailure",
2750 message,
2751 ),
2752 }
2753}
2754
2755#[cfg(test)]
2756mod tests {
2757 use std::collections::HashMap;
2758 use std::sync::Arc;
2759
2760 use bytes::Bytes;
2761 use chrono::Utc;
2762 use http::{HeaderMap, Method};
2763 use parking_lot::RwLock;
2764 use uuid::Uuid;
2765
2766 use super::{
2767 db_instance_xml, filter_engine_versions, filter_orderable_options, merge_tags,
2768 optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
2769 };
2770 use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
2771 use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
2772
2773 #[test]
2774 fn filter_engine_versions_matches_requested_engine() {
2775 let versions = default_engine_versions();
2776
2777 let filtered =
2778 filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
2779
2780 assert_eq!(filtered.len(), 4); assert!(filtered.iter().all(|v| v.engine == "postgres"));
2782 }
2783
2784 #[test]
2785 fn filter_orderable_options_respects_instance_class() {
2786 let options = default_orderable_options();
2787
2788 let filtered = filter_orderable_options(
2789 &options,
2790 &Some("postgres".to_string()),
2791 &Some("16.3".to_string()),
2792 &Some("db.t3.micro".to_string()),
2793 &None,
2794 Some(true),
2795 );
2796
2797 assert_eq!(filtered.len(), 1);
2798 assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
2799 }
2800
2801 #[test]
2802 fn validate_create_request_rejects_unsupported_engine() {
2803 let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
2804 .expect_err("unsupported engine");
2805
2806 assert_eq!(error.code(), "InvalidParameterValue");
2807 }
2808
2809 #[test]
2810 fn optional_i32_param_rejects_invalid_integer() {
2811 let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
2812
2813 let error = optional_i32_param(&request, "Port").expect_err("invalid port");
2814
2815 assert_eq!(error.code(), "InvalidParameterValue");
2816 }
2817
2818 #[test]
2819 fn db_instance_xml_renders_endpoint_and_status() {
2820 let created_at = Utc::now();
2821 let instance = DbInstance {
2822 db_instance_identifier: "test-db".to_string(),
2823 db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
2824 db_instance_class: "db.t3.micro".to_string(),
2825 engine: "postgres".to_string(),
2826 engine_version: "16.3".to_string(),
2827 db_instance_status: "available".to_string(),
2828 master_username: "admin".to_string(),
2829 db_name: Some("appdb".to_string()),
2830 endpoint_address: "127.0.0.1".to_string(),
2831 port: 15432,
2832 allocated_storage: 20,
2833 publicly_accessible: true,
2834 deletion_protection: false,
2835 created_at,
2836 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
2837 master_user_password: "secret123".to_string(),
2838 container_id: "container".to_string(),
2839 host_port: 15432,
2840 tags: Vec::new(),
2841 read_replica_source_db_instance_identifier: None,
2842 read_replica_db_instance_identifiers: Vec::new(),
2843 vpc_security_group_ids: vec!["sg-12345678".to_string()],
2844 db_parameter_group_name: Some("default.postgres16".to_string()),
2845 backup_retention_period: 1,
2846 preferred_backup_window: "03:00-04:00".to_string(),
2847 latest_restorable_time: Some(created_at),
2848 option_group_name: None,
2849 multi_az: false,
2850 pending_modified_values: None,
2851 };
2852
2853 let xml = db_instance_xml(&instance, Some("creating"));
2854
2855 assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
2856 assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
2857 assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
2858 }
2859
2860 #[test]
2861 fn parse_tags_reads_rds_query_shape() {
2862 let request = request(
2863 "AddTagsToResource",
2864 &[
2865 ("Tags.Tag.1.Key", "env"),
2866 ("Tags.Tag.1.Value", "dev"),
2867 ("Tags.Tag.2.Key", "team"),
2868 ("Tags.Tag.2.Value", "core"),
2869 ],
2870 );
2871
2872 let tags = parse_tags(&request).expect("tags");
2873
2874 assert_eq!(
2875 tags,
2876 vec![
2877 RdsTag {
2878 key: "env".to_string(),
2879 value: "dev".to_string(),
2880 },
2881 RdsTag {
2882 key: "team".to_string(),
2883 value: "core".to_string(),
2884 }
2885 ]
2886 );
2887 }
2888
2889 #[test]
2890 fn parse_tag_keys_reads_member_shape() {
2891 let request = request(
2892 "RemoveTagsFromResource",
2893 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
2894 );
2895
2896 let tag_keys = parse_tag_keys(&request).expect("tag keys");
2897
2898 assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
2899 }
2900
2901 #[test]
2902 fn merge_tags_updates_existing_values() {
2903 let mut tags = vec![RdsTag {
2904 key: "env".to_string(),
2905 value: "dev".to_string(),
2906 }];
2907
2908 merge_tags(
2909 &mut tags,
2910 &[
2911 RdsTag {
2912 key: "env".to_string(),
2913 value: "prod".to_string(),
2914 },
2915 RdsTag {
2916 key: "team".to_string(),
2917 value: "core".to_string(),
2918 },
2919 ],
2920 );
2921
2922 assert_eq!(tags.len(), 2);
2923 assert_eq!(tags[0].value, "prod");
2924 assert_eq!(tags[1].key, "team");
2925 }
2926
2927 #[tokio::test]
2928 async fn describe_engine_versions_returns_xml_body() {
2929 let service = RdsService::new(Arc::new(RwLock::new(
2930 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2931 )));
2932 let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
2933
2934 let response = service.handle(request).await.expect("response");
2935 let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
2936
2937 assert!(body.contains("<DescribeDBEngineVersionsResponse"));
2938 assert!(body.contains("<Engine>postgres</Engine>"));
2939 assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
2940 }
2941
2942 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
2943 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
2944 for (key, value) in params {
2945 query_params.insert((*key).to_string(), (*value).to_string());
2946 }
2947
2948 AwsRequest {
2949 service: "rds".to_string(),
2950 action: action.to_string(),
2951 region: "us-east-1".to_string(),
2952 account_id: "123456789012".to_string(),
2953 request_id: "test-request-id".to_string(),
2954 headers: HeaderMap::new(),
2955 query_params,
2956 body: Bytes::new(),
2957 path_segments: vec![],
2958 raw_path: "/".to_string(),
2959 raw_query: String::new(),
2960 method: Method::POST,
2961 is_query_protocol: true,
2962 access_key_id: None,
2963 principal: None,
2964 }
2965 }
2966
2967 fn make_service() -> RdsService {
2970 RdsService::new(Arc::new(RwLock::new(
2971 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2972 )))
2973 }
2974
2975 fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
2976 String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
2977 }
2978
2979 fn seed_instance(svc: &RdsService, identifier: &str) -> String {
2980 let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
2981 let mut accounts = svc.state.write();
2982 let state = accounts.default_mut();
2983 state.instances.insert(
2984 identifier.to_string(),
2985 DbInstance {
2986 db_instance_identifier: identifier.to_string(),
2987 db_instance_arn: arn.clone(),
2988 db_instance_class: "db.t3.micro".to_string(),
2989 engine: "postgres".to_string(),
2990 engine_version: "16.3".to_string(),
2991 db_instance_status: "available".to_string(),
2992 master_username: "admin".to_string(),
2993 db_name: Some("appdb".to_string()),
2994 endpoint_address: "127.0.0.1".to_string(),
2995 port: 15432,
2996 allocated_storage: 20,
2997 publicly_accessible: true,
2998 deletion_protection: false,
2999 created_at: Utc::now(),
3000 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3001 master_user_password: "secret".to_string(),
3002 container_id: "container".to_string(),
3003 host_port: 15432,
3004 tags: Vec::new(),
3005 read_replica_source_db_instance_identifier: None,
3006 read_replica_db_instance_identifiers: Vec::new(),
3007 vpc_security_group_ids: vec!["sg-12345678".to_string()],
3008 db_parameter_group_name: Some("default.postgres16".to_string()),
3009 backup_retention_period: 1,
3010 preferred_backup_window: "03:00-04:00".to_string(),
3011 latest_restorable_time: None,
3012 option_group_name: None,
3013 multi_az: false,
3014 pending_modified_values: None,
3015 },
3016 );
3017 arn
3018 }
3019
3020 fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3021 match result {
3022 Ok(_) => panic!("expected error {expected_code}, got Ok"),
3023 Err(e) => {
3024 assert_eq!(e.code(), expected_code, "wrong error code");
3025 e
3026 }
3027 }
3028 }
3029
3030 #[test]
3033 fn add_tags_requires_resource_name() {
3034 let svc = make_service();
3035 let req = request("AddTagsToResource", &[]);
3036 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3037 }
3038
3039 #[test]
3040 fn add_tags_requires_at_least_one_tag() {
3041 let svc = make_service();
3042 let arn = seed_instance(&svc, "db1");
3043 let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3044 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3045 }
3046
3047 #[test]
3048 fn add_tags_appends_then_list_tags_returns_them() {
3049 let svc = make_service();
3050 let arn = seed_instance(&svc, "db1");
3051 let add_req = request(
3052 "AddTagsToResource",
3053 &[
3054 ("ResourceName", arn.as_str()),
3055 ("Tags.Tag.1.Key", "env"),
3056 ("Tags.Tag.1.Value", "dev"),
3057 ],
3058 );
3059 svc.add_tags_to_resource(&add_req).unwrap();
3060
3061 let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3062 let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3063 assert!(body.contains("<Key>env</Key>"));
3064 assert!(body.contains("<Value>dev</Value>"));
3065 }
3066
3067 #[test]
3068 fn list_tags_rejects_filters_param() {
3069 let svc = make_service();
3070 let arn = seed_instance(&svc, "db1");
3071 let req = request(
3072 "ListTagsForResource",
3073 &[
3074 ("ResourceName", arn.as_str()),
3075 ("Filters.Filter.1.Name", "x"),
3076 ],
3077 );
3078 assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3079 }
3080
3081 #[test]
3082 fn list_tags_unknown_arn_errors() {
3083 let svc = make_service();
3084 let req = request(
3085 "ListTagsForResource",
3086 &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3087 );
3088 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3089 }
3090
3091 #[test]
3092 fn remove_tags_strips_only_listed_keys() {
3093 let svc = make_service();
3094 let arn = seed_instance(&svc, "db1");
3095 {
3096 let mut __a = svc.state.write();
3097 let state = __a.default_mut();
3098 let inst = state.instances.get_mut("db1").unwrap();
3099 inst.tags = vec![
3100 RdsTag {
3101 key: "env".to_string(),
3102 value: "dev".to_string(),
3103 },
3104 RdsTag {
3105 key: "team".to_string(),
3106 value: "core".to_string(),
3107 },
3108 ];
3109 }
3110 let req = request(
3111 "RemoveTagsFromResource",
3112 &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3113 );
3114 svc.remove_tags_from_resource(&req).unwrap();
3115
3116 let __a = svc.state.read();
3117 let state = __a.default_ref();
3118 let tags = &state.instances.get("db1").unwrap().tags;
3119 assert_eq!(tags.len(), 1);
3120 assert_eq!(tags[0].key, "team");
3121 }
3122
3123 #[test]
3124 fn remove_tags_requires_keys() {
3125 let svc = make_service();
3126 let arn = seed_instance(&svc, "db1");
3127 let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3128 assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3129 }
3130
3131 fn create_subnet_group(svc: &RdsService, name: &str) {
3134 let req = request(
3135 "CreateDBSubnetGroup",
3136 &[
3137 ("DBSubnetGroupName", name),
3138 ("DBSubnetGroupDescription", "test"),
3139 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3140 ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3141 ],
3142 );
3143 svc.create_db_subnet_group(&req).unwrap();
3144 }
3145
3146 #[test]
3147 fn create_db_subnet_group_requires_two_subnets() {
3148 let svc = make_service();
3149 let req = request(
3150 "CreateDBSubnetGroup",
3151 &[
3152 ("DBSubnetGroupName", "sg1"),
3153 ("DBSubnetGroupDescription", "t"),
3154 ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3155 ],
3156 );
3157 assert_code(
3158 svc.create_db_subnet_group(&req),
3159 "DBSubnetGroupDoesNotCoverEnoughAZs",
3160 );
3161 }
3162
3163 #[test]
3164 fn create_db_subnet_group_rejects_empty_subnets() {
3165 let svc = make_service();
3166 let req = request(
3167 "CreateDBSubnetGroup",
3168 &[
3169 ("DBSubnetGroupName", "sg1"),
3170 ("DBSubnetGroupDescription", "t"),
3171 ],
3172 );
3173 assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3174 }
3175
3176 #[test]
3177 fn create_db_subnet_group_rejects_duplicates() {
3178 let svc = make_service();
3179 create_subnet_group(&svc, "sg1");
3180 let req = request(
3181 "CreateDBSubnetGroup",
3182 &[
3183 ("DBSubnetGroupName", "sg1"),
3184 ("DBSubnetGroupDescription", "t"),
3185 ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3186 ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3187 ],
3188 );
3189 assert_code(
3190 svc.create_db_subnet_group(&req),
3191 "DBSubnetGroupAlreadyExists",
3192 );
3193 }
3194
3195 #[test]
3196 fn describe_db_subnet_groups_by_name_or_list() {
3197 let svc = make_service();
3198 create_subnet_group(&svc, "sg-alpha");
3199 create_subnet_group(&svc, "sg-beta");
3200
3201 let by_name = request(
3202 "DescribeDBSubnetGroups",
3203 &[("DBSubnetGroupName", "sg-alpha")],
3204 );
3205 let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3206 assert!(body.contains("sg-alpha"));
3207 assert!(!body.contains("sg-beta"));
3208
3209 let list_all = request("DescribeDBSubnetGroups", &[]);
3210 let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3211 assert!(body.contains("sg-alpha"));
3212 assert!(body.contains("sg-beta"));
3213 }
3214
3215 #[test]
3216 fn describe_db_subnet_groups_unknown_name_errors() {
3217 let svc = make_service();
3218 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3219 assert_code(
3220 svc.describe_db_subnet_groups(&req),
3221 "DBSubnetGroupNotFoundFault",
3222 );
3223 }
3224
3225 #[test]
3226 fn delete_db_subnet_group_unknown_errors() {
3227 let svc = make_service();
3228 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3229 assert_code(
3230 svc.delete_db_subnet_group(&req),
3231 "DBSubnetGroupNotFoundFault",
3232 );
3233 }
3234
3235 #[test]
3236 fn delete_db_subnet_group_removes_entry() {
3237 let svc = make_service();
3238 create_subnet_group(&svc, "sg1");
3239 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3240 svc.delete_db_subnet_group(&req).unwrap();
3241 assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3242 }
3243
3244 #[test]
3245 fn modify_db_subnet_group_updates_subnet_ids() {
3246 let svc = make_service();
3247 create_subnet_group(&svc, "sg1");
3248 let req = request(
3249 "ModifyDBSubnetGroup",
3250 &[
3251 ("DBSubnetGroupName", "sg1"),
3252 ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3253 ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3254 ],
3255 );
3256 svc.modify_db_subnet_group(&req).unwrap();
3257
3258 let __a = svc.state.read();
3259 let state = __a.default_ref();
3260 let sg = state.subnet_groups.get("sg1").unwrap();
3261 assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3262 }
3263
3264 fn create_param_group(svc: &RdsService, name: &str) {
3267 let req = request(
3268 "CreateDBParameterGroup",
3269 &[
3270 ("DBParameterGroupName", name),
3271 ("DBParameterGroupFamily", "postgres16"),
3272 ("Description", "test"),
3273 ],
3274 );
3275 svc.create_db_parameter_group(&req).unwrap();
3276 }
3277
3278 #[test]
3279 fn create_db_parameter_group_rejects_unknown_family() {
3280 let svc = make_service();
3281 let req = request(
3282 "CreateDBParameterGroup",
3283 &[
3284 ("DBParameterGroupName", "pg1"),
3285 ("DBParameterGroupFamily", "oracle19"),
3286 ("Description", "t"),
3287 ],
3288 );
3289 assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3290 }
3291
3292 #[test]
3293 fn create_db_parameter_group_rejects_duplicates() {
3294 let svc = make_service();
3295 create_param_group(&svc, "pg1");
3296 let req = request(
3297 "CreateDBParameterGroup",
3298 &[
3299 ("DBParameterGroupName", "pg1"),
3300 ("DBParameterGroupFamily", "postgres16"),
3301 ("Description", "t"),
3302 ],
3303 );
3304 assert_code(
3305 svc.create_db_parameter_group(&req),
3306 "DBParameterGroupAlreadyExists",
3307 );
3308 }
3309
3310 #[test]
3311 fn describe_db_parameter_groups_by_name_or_list() {
3312 let svc = make_service();
3313 create_param_group(&svc, "pg-alpha");
3314 create_param_group(&svc, "pg-beta");
3315 let by_name = request(
3316 "DescribeDBParameterGroups",
3317 &[("DBParameterGroupName", "pg-alpha")],
3318 );
3319 let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3320 assert!(body.contains("pg-alpha"));
3321 assert!(!body.contains("pg-beta"));
3322 let list = request("DescribeDBParameterGroups", &[]);
3323 let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3324 assert!(body.contains("pg-alpha"));
3325 assert!(body.contains("pg-beta"));
3326 }
3327
3328 #[test]
3329 fn describe_db_parameter_groups_unknown_name_errors() {
3330 let svc = make_service();
3331 let req = request(
3332 "DescribeDBParameterGroups",
3333 &[("DBParameterGroupName", "ghost")],
3334 );
3335 assert_code(
3336 svc.describe_db_parameter_groups(&req),
3337 "DBParameterGroupNotFound",
3338 );
3339 }
3340
3341 #[test]
3342 fn delete_db_parameter_group_rejects_default_groups() {
3343 let svc = make_service();
3344 let req = request(
3345 "DeleteDBParameterGroup",
3346 &[("DBParameterGroupName", "default.postgres16")],
3347 );
3348 assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3349 }
3350
3351 #[test]
3352 fn delete_db_parameter_group_unknown_errors() {
3353 let svc = make_service();
3354 let req = request(
3355 "DeleteDBParameterGroup",
3356 &[("DBParameterGroupName", "ghost")],
3357 );
3358 assert_code(
3359 svc.delete_db_parameter_group(&req),
3360 "DBParameterGroupNotFound",
3361 );
3362 }
3363
3364 #[test]
3365 fn delete_db_parameter_group_removes_entry() {
3366 let svc = make_service();
3367 create_param_group(&svc, "pg1");
3368 let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
3369 svc.delete_db_parameter_group(&req).unwrap();
3370 assert!(!svc
3371 .state
3372 .read()
3373 .default_ref()
3374 .parameter_groups
3375 .contains_key("pg1"));
3376 }
3377
3378 #[test]
3379 fn modify_db_parameter_group_updates_description() {
3380 let svc = make_service();
3381 create_param_group(&svc, "pg1");
3382 let req = request(
3383 "ModifyDBParameterGroup",
3384 &[
3385 ("DBParameterGroupName", "pg1"),
3386 ("Description", "shiny new"),
3387 ],
3388 );
3389 svc.modify_db_parameter_group(&req).unwrap();
3390 let __a = svc.state.read();
3391 let state = __a.default_ref();
3392 assert_eq!(
3393 state.parameter_groups.get("pg1").unwrap().description,
3394 "shiny new"
3395 );
3396 }
3397
3398 #[test]
3399 fn modify_db_parameter_group_unknown_errors() {
3400 let svc = make_service();
3401 let req = request(
3402 "ModifyDBParameterGroup",
3403 &[("DBParameterGroupName", "ghost"), ("Description", "x")],
3404 );
3405 assert_code(
3406 svc.modify_db_parameter_group(&req),
3407 "DBParameterGroupNotFound",
3408 );
3409 }
3410
3411 #[test]
3414 fn describe_db_instances_by_id_returns_only_one() {
3415 let svc = make_service();
3416 seed_instance(&svc, "db1");
3417 seed_instance(&svc, "db2");
3418 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
3419 let body = body_of(svc.describe_db_instances(&req).unwrap());
3420 assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
3421 assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
3422 }
3423
3424 #[test]
3425 fn describe_db_instances_unknown_id_errors() {
3426 let svc = make_service();
3427 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3428 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3429 }
3430
3431 #[test]
3432 fn describe_db_instances_lists_all_when_unbounded() {
3433 let svc = make_service();
3434 seed_instance(&svc, "db1");
3435 seed_instance(&svc, "db2");
3436 seed_instance(&svc, "db3");
3437 let req = request("DescribeDBInstances", &[]);
3438 let body = body_of(svc.describe_db_instances(&req).unwrap());
3439 for id in ["db1", "db2", "db3"] {
3440 assert!(body.contains(&format!(
3441 "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
3442 )));
3443 }
3444 }
3445
3446 #[test]
3449 fn modify_db_instance_requires_at_least_one_change() {
3450 let svc = make_service();
3451 seed_instance(&svc, "db1");
3452 let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
3453 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
3454 }
3455
3456 #[test]
3457 fn modify_db_instance_unknown_errors() {
3458 let svc = make_service();
3459 let req = request(
3460 "ModifyDBInstance",
3461 &[
3462 ("DBInstanceIdentifier", "ghost"),
3463 ("DBInstanceClass", "db.t3.small"),
3464 ],
3465 );
3466 assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
3467 }
3468
3469 #[test]
3470 fn modify_db_instance_apply_immediately_updates_class() {
3471 let svc = make_service();
3472 seed_instance(&svc, "db1");
3473 let req = request(
3474 "ModifyDBInstance",
3475 &[
3476 ("DBInstanceIdentifier", "db1"),
3477 ("DBInstanceClass", "db.t3.small"),
3478 ("ApplyImmediately", "true"),
3479 ],
3480 );
3481 svc.modify_db_instance(&req).unwrap();
3482 let __a = svc.state.read();
3483 let state = __a.default_ref();
3484 assert_eq!(
3485 state.instances.get("db1").unwrap().db_instance_class,
3486 "db.t3.small"
3487 );
3488 }
3489
3490 #[test]
3491 fn modify_db_instance_pending_when_not_apply_immediately() {
3492 let svc = make_service();
3493 seed_instance(&svc, "db1");
3494 let req = request(
3495 "ModifyDBInstance",
3496 &[
3497 ("DBInstanceIdentifier", "db1"),
3498 ("DBInstanceClass", "db.t3.small"),
3499 ("ApplyImmediately", "false"),
3500 ],
3501 );
3502 svc.modify_db_instance(&req).unwrap();
3503 let __a = svc.state.read();
3504 let state = __a.default_ref();
3505 let inst = state.instances.get("db1").unwrap();
3506 assert_eq!(inst.db_instance_class, "db.t3.micro");
3507 assert_eq!(
3508 inst.pending_modified_values
3509 .as_ref()
3510 .unwrap()
3511 .db_instance_class
3512 .as_deref(),
3513 Some("db.t3.small"),
3514 );
3515 }
3516
3517 fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
3520 let mut __a = svc.state.write();
3521 let state = __a.default_mut();
3522 let arn = state.db_snapshot_arn(snapshot_id);
3523 state.snapshots.insert(
3524 snapshot_id.to_string(),
3525 crate::state::DbSnapshot {
3526 db_snapshot_identifier: snapshot_id.to_string(),
3527 db_snapshot_arn: arn,
3528 db_instance_identifier: instance_id.to_string(),
3529 snapshot_create_time: Utc::now(),
3530 engine: "postgres".to_string(),
3531 engine_version: "16.3".to_string(),
3532 allocated_storage: 20,
3533 status: "available".to_string(),
3534 port: 5432,
3535 master_username: "admin".to_string(),
3536 db_name: Some("appdb".to_string()),
3537 dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3538 snapshot_type: "manual".to_string(),
3539 master_user_password: "secret".to_string(),
3540 tags: Vec::new(),
3541 dump_data: Vec::new(),
3542 },
3543 );
3544 }
3545
3546 #[test]
3547 fn delete_db_snapshot_removes_entry() {
3548 let svc = make_service();
3549 seed_snapshot(&svc, "snap1", "db1");
3550 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
3551 svc.delete_db_snapshot(&req).unwrap();
3552 assert!(svc.state.read().default_ref().snapshots.is_empty());
3553 }
3554
3555 #[test]
3556 fn delete_db_snapshot_unknown_errors() {
3557 let svc = make_service();
3558 let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
3559 assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
3560 }
3561
3562 #[test]
3563 fn describe_db_snapshots_rejects_both_filters() {
3564 let svc = make_service();
3565 let req = request(
3566 "DescribeDBSnapshots",
3567 &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
3568 );
3569 assert_code(
3570 svc.describe_db_snapshots(&req),
3571 "InvalidParameterCombination",
3572 );
3573 }
3574
3575 #[test]
3576 fn describe_db_snapshots_by_id_or_instance() {
3577 let svc = make_service();
3578 seed_snapshot(&svc, "snap1", "db1");
3579 seed_snapshot(&svc, "snap2", "db2");
3580
3581 let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
3582 let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
3583 assert!(body.contains("snap1"));
3584 assert!(!body.contains("snap2"));
3585
3586 let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
3587 let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
3588 assert!(body.contains("snap2"));
3589 assert!(!body.contains("snap1"));
3590
3591 let list_all = request("DescribeDBSnapshots", &[]);
3592 let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
3593 assert!(body.contains("snap1"));
3594 assert!(body.contains("snap2"));
3595 }
3596
3597 #[test]
3598 fn describe_db_snapshots_unknown_id_errors() {
3599 let svc = make_service();
3600 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
3601 assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
3602 }
3603
3604 #[test]
3607 fn describe_db_instances_not_found() {
3608 let svc = make_service();
3609 let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3610 assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3611 }
3612
3613 #[tokio::test]
3614 async fn delete_db_instance_not_found() {
3615 let svc = make_service();
3616 let req = request(
3617 "DeleteDBInstance",
3618 &[
3619 ("DBInstanceIdentifier", "ghost"),
3620 ("SkipFinalSnapshot", "true"),
3621 ],
3622 );
3623 assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
3624 }
3625
3626 #[test]
3627 fn modify_db_instance_not_found() {
3628 let svc = make_service();
3629 let req = request(
3630 "ModifyDBInstance",
3631 &[
3632 ("DBInstanceIdentifier", "ghost"),
3633 ("AllocatedStorage", "20"),
3634 ],
3635 );
3636 assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
3638 }
3639
3640 #[tokio::test]
3641 async fn reboot_db_instance_not_found() {
3642 let svc = make_service();
3643 let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
3644 assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
3645 }
3646
3647 #[tokio::test]
3648 async fn create_db_snapshot_instance_not_found() {
3649 let svc = make_service();
3650 let req = request(
3651 "CreateDBSnapshot",
3652 &[
3653 ("DBInstanceIdentifier", "ghost"),
3654 ("DBSnapshotIdentifier", "snap1"),
3655 ],
3656 );
3657 assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
3658 }
3659
3660 #[tokio::test]
3661 async fn restore_db_instance_snapshot_not_found() {
3662 let svc = make_service();
3663 let req = request(
3664 "RestoreDBInstanceFromDBSnapshot",
3665 &[
3666 ("DBInstanceIdentifier", "restored"),
3667 ("DBSnapshotIdentifier", "ghost-snap"),
3668 ],
3669 );
3670 assert_code(
3671 svc.restore_db_instance_from_db_snapshot(&req).await,
3672 "InvalidParameterValue",
3673 );
3674 }
3675
3676 #[tokio::test]
3677 async fn create_db_instance_read_replica_source_not_found() {
3678 let svc = make_service();
3679 let req = request(
3680 "CreateDBInstanceReadReplica",
3681 &[
3682 ("DBInstanceIdentifier", "replica"),
3683 ("SourceDBInstanceIdentifier", "ghost"),
3684 ],
3685 );
3686 assert_code(
3687 svc.create_db_instance_read_replica(&req).await,
3688 "InvalidParameterValue",
3689 );
3690 }
3691
3692 #[test]
3693 fn describe_db_engine_versions_basic() {
3694 let svc = make_service();
3695 let req = request("DescribeDBEngineVersions", &[]);
3696 let resp = svc.describe_db_engine_versions(&req).unwrap();
3697 let body = body_of(resp);
3698 assert!(body.contains("<DBEngineVersions>"));
3699 }
3700
3701 #[test]
3702 fn describe_orderable_db_instance_options_basic() {
3703 let svc = make_service();
3704 let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
3705 let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
3706 let body = body_of(resp);
3707 assert!(body.contains("<OrderableDBInstanceOptions>"));
3708 }
3709
3710 #[test]
3711 fn describe_db_parameter_group_not_found() {
3712 let svc = make_service();
3713 let req = request(
3714 "DescribeDBParameterGroups",
3715 &[("DBParameterGroupName", "ghost")],
3716 );
3717 assert_code(
3718 svc.describe_db_parameter_groups(&req),
3719 "DBParameterGroupNotFound",
3720 );
3721 }
3722
3723 #[test]
3724 fn delete_db_parameter_group_not_found() {
3725 let svc = make_service();
3726 let req = request(
3727 "DeleteDBParameterGroup",
3728 &[("DBParameterGroupName", "ghost")],
3729 );
3730 assert_code(
3731 svc.delete_db_parameter_group(&req),
3732 "DBParameterGroupNotFound",
3733 );
3734 }
3735
3736 #[test]
3737 fn describe_db_subnet_group_not_found() {
3738 let svc = make_service();
3739 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3740 assert_code(
3741 svc.describe_db_subnet_groups(&req),
3742 "DBSubnetGroupNotFoundFault",
3743 );
3744 }
3745
3746 #[test]
3747 fn delete_db_subnet_group_not_found() {
3748 let svc = make_service();
3749 let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3750 assert_code(
3751 svc.delete_db_subnet_group(&req),
3752 "DBSubnetGroupNotFoundFault",
3753 );
3754 }
3755
3756 #[test]
3757 fn add_tags_resource_not_found() {
3758 let svc = make_service();
3759 let req = request(
3760 "AddTagsToResource",
3761 &[
3762 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
3763 ("Tags.member.1.Key", "k"),
3764 ("Tags.member.1.Value", "v"),
3765 ],
3766 );
3767 assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3768 }
3769
3770 #[test]
3771 fn list_tags_resource_not_found() {
3772 let svc = make_service();
3773 let req = request(
3774 "ListTagsForResource",
3775 &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
3776 );
3777 assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3778 }
3779
3780 #[tokio::test]
3783 async fn create_db_snapshot_missing_id_errors() {
3784 let svc = make_service();
3785 let req = request(
3786 "CreateDBSnapshot",
3787 &[("DBInstanceIdentifier", "nonexistent")],
3788 );
3789 assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
3790 }
3791
3792 #[tokio::test]
3793 async fn create_db_snapshot_unknown_instance_errors() {
3794 let svc = make_service();
3795 let req = request(
3796 "CreateDBSnapshot",
3797 &[
3798 ("DBSnapshotIdentifier", "snap1"),
3799 ("DBInstanceIdentifier", "ghost"),
3800 ],
3801 );
3802 assert!(svc.create_db_snapshot(&req).await.is_err());
3803 }
3804
3805 #[tokio::test]
3808 async fn delete_db_instance_missing_id_errors() {
3809 let svc = make_service();
3810 let req = request("DeleteDBInstance", &[]);
3811 assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
3812 }
3813
3814 #[tokio::test]
3817 async fn reboot_db_instance_missing_id_errors() {
3818 let svc = make_service();
3819 let req = request("RebootDBInstance", &[]);
3820 assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
3821 }
3822
3823 #[tokio::test]
3826 async fn create_db_instance_missing_id_errors() {
3827 let svc = make_service();
3828 let req = request(
3829 "CreateDBInstance",
3830 &[
3831 ("Engine", "postgres"),
3832 ("DBInstanceClass", "db.t3.micro"),
3833 ("AllocatedStorage", "20"),
3834 ("MasterUsername", "admin"),
3835 ("MasterUserPassword", "secretpass"),
3836 ],
3837 );
3838 assert!(svc.create_db_instance(&req).await.is_err());
3839 }
3840
3841 #[tokio::test]
3842 async fn create_db_instance_unsupported_engine_errors() {
3843 let svc = make_service();
3844 let req = request(
3845 "CreateDBInstance",
3846 &[
3847 ("DBInstanceIdentifier", "db1"),
3848 ("Engine", "mongodb"),
3849 ("DBInstanceClass", "db.t3.micro"),
3850 ("AllocatedStorage", "20"),
3851 ("MasterUsername", "admin"),
3852 ("MasterUserPassword", "secretpass"),
3853 ],
3854 );
3855 assert!(svc.create_db_instance(&req).await.is_err());
3856 }
3857
3858 #[tokio::test]
3861 async fn restore_db_instance_missing_ids_errors() {
3862 let svc = make_service();
3863 let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
3864 assert!(svc
3865 .restore_db_instance_from_db_snapshot(&req)
3866 .await
3867 .is_err());
3868 }
3869
3870 #[tokio::test]
3871 async fn restore_db_instance_unknown_snapshot_errors() {
3872 let svc = make_service();
3873 let req = request(
3874 "RestoreDBInstanceFromDBSnapshot",
3875 &[
3876 ("DBInstanceIdentifier", "restored"),
3877 ("DBSnapshotIdentifier", "missing"),
3878 ],
3879 );
3880 assert!(svc
3881 .restore_db_instance_from_db_snapshot(&req)
3882 .await
3883 .is_err());
3884 }
3885
3886 #[tokio::test]
3889 async fn create_read_replica_missing_source_errors() {
3890 let svc = make_service();
3891 let req = request(
3892 "CreateDBInstanceReadReplica",
3893 &[("DBInstanceIdentifier", "replica1")],
3894 );
3895 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
3896 }
3897
3898 #[tokio::test]
3899 async fn create_read_replica_unknown_source_errors() {
3900 let svc = make_service();
3901 let req = request(
3902 "CreateDBInstanceReadReplica",
3903 &[
3904 ("DBInstanceIdentifier", "replica1"),
3905 ("SourceDBInstanceIdentifier", "ghost"),
3906 ],
3907 );
3908 assert!(svc.create_db_instance_read_replica(&req).await.is_err());
3909 }
3910
3911 #[test]
3914 fn describe_db_snapshots_by_snapshot_id_only() {
3915 let svc = make_service();
3916 seed_snapshot(&svc, "s1", "inst1");
3917 let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
3918 let resp = svc.describe_db_snapshots(&req).unwrap();
3919 let b = body_of(resp);
3920 assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
3921 }
3922
3923 #[test]
3924 fn describe_db_snapshots_by_instance_id_returns_matching() {
3925 let svc = make_service();
3926 seed_snapshot(&svc, "s1", "inst1");
3927 seed_snapshot(&svc, "s2", "inst2");
3928 let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
3929 let resp = svc.describe_db_snapshots(&req).unwrap();
3930 let b = body_of(resp);
3931 assert!(b.contains("s1"));
3932 assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
3933 }
3934
3935 #[test]
3938 fn modify_db_parameter_group_missing_name() {
3939 let svc = make_service();
3940 let req = request("ModifyDBParameterGroup", &[]);
3941 assert!(svc.modify_db_parameter_group(&req).is_err());
3942 }
3943
3944 #[test]
3947 fn modify_db_subnet_group_unknown_errors() {
3948 let svc = make_service();
3949 let req = request(
3950 "ModifyDBSubnetGroup",
3951 &[
3952 ("DBSubnetGroupName", "ghost"),
3953 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
3954 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
3955 ],
3956 );
3957 assert!(svc.modify_db_subnet_group(&req).is_err());
3958 }
3959
3960 #[test]
3963 fn describe_db_instances_empty_returns_xml() {
3964 let svc = make_service();
3965 let req = request("DescribeDBInstances", &[]);
3966 let resp = svc.describe_db_instances(&req).unwrap();
3967 let b = body_of(resp);
3968 assert!(b.contains("DescribeDBInstancesResult"));
3969 }
3970
3971 #[test]
3972 fn describe_db_snapshots_empty_returns_empty_list() {
3973 let svc = make_service();
3974 let req = request("DescribeDBSnapshots", &[]);
3975 let resp = svc.describe_db_snapshots(&req).unwrap();
3976 let b = body_of(resp);
3977 assert!(b.contains("DescribeDBSnapshotsResult"));
3978 }
3979
3980 #[test]
3981 fn add_tags_unknown_resource_errors() {
3982 let svc = make_service();
3983 let req = request(
3984 "AddTagsToResource",
3985 &[
3986 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
3987 ("Tags.member.1.Key", "k"),
3988 ("Tags.member.1.Value", "v"),
3989 ],
3990 );
3991 assert!(svc.add_tags_to_resource(&req).is_err());
3992 }
3993
3994 #[test]
3995 fn remove_tags_unknown_resource_errors() {
3996 let svc = make_service();
3997 let req = request(
3998 "RemoveTagsFromResource",
3999 &[
4000 ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4001 ("TagKeys.member.1", "k"),
4002 ],
4003 );
4004 assert!(svc.remove_tags_from_resource(&req).is_err());
4005 }
4006
4007 #[test]
4008 fn create_db_parameter_group_missing_name_errors() {
4009 let svc = make_service();
4010 let req = request(
4011 "CreateDBParameterGroup",
4012 &[
4013 ("DBParameterGroupFamily", "postgres16"),
4014 ("Description", "d"),
4015 ],
4016 );
4017 assert!(svc.create_db_parameter_group(&req).is_err());
4018 }
4019
4020 #[test]
4021 fn create_db_subnet_group_missing_desc_errors() {
4022 let svc = make_service();
4023 let req = request(
4024 "CreateDBSubnetGroup",
4025 &[
4026 ("DBSubnetGroupName", "sg1"),
4027 ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4028 ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4029 ],
4030 );
4031 assert!(svc.create_db_subnet_group(&req).is_err());
4032 }
4033
4034 #[tokio::test]
4035 async fn create_db_instance_missing_class_errors() {
4036 let svc = make_service();
4037 let req = request(
4038 "CreateDBInstance",
4039 &[
4040 ("DBInstanceIdentifier", "miss-class"),
4041 ("Engine", "postgres"),
4042 ("AllocatedStorage", "20"),
4043 ("MasterUsername", "admin"),
4044 ("MasterUserPassword", "secretpass"),
4045 ],
4046 );
4047 assert!(svc.create_db_instance(&req).await.is_err());
4048 }
4049
4050 #[tokio::test]
4051 async fn create_db_instance_missing_master_username_errors() {
4052 let svc = make_service();
4053 let req = request(
4054 "CreateDBInstance",
4055 &[
4056 ("DBInstanceIdentifier", "miss-mu"),
4057 ("Engine", "postgres"),
4058 ("DBInstanceClass", "db.t3.micro"),
4059 ("AllocatedStorage", "20"),
4060 ("MasterUserPassword", "secretpass"),
4061 ],
4062 );
4063 assert!(svc.create_db_instance(&req).await.is_err());
4064 }
4065
4066 #[test]
4067 fn modify_db_instance_missing_id_errors() {
4068 let svc = make_service();
4069 let req = request("ModifyDBInstance", &[]);
4070 assert!(svc.modify_db_instance(&req).is_err());
4071 }
4072
4073 #[test]
4074 fn modify_db_parameter_group_unknown_pg_errors() {
4075 let svc = make_service();
4076 let req = request(
4077 "ModifyDBParameterGroup",
4078 &[
4079 ("DBParameterGroupName", "ghost"),
4080 ("Parameters.member.1.ParameterName", "p"),
4081 ("Parameters.member.1.ParameterValue", "v"),
4082 ("Parameters.member.1.ApplyMethod", "immediate"),
4083 ],
4084 );
4085 assert!(svc.modify_db_parameter_group(&req).is_err());
4086 }
4087
4088 #[test]
4089 fn describe_db_parameter_groups_unknown_errors() {
4090 let svc = make_service();
4091 let req = request(
4092 "DescribeDBParameterGroups",
4093 &[("DBParameterGroupName", "ghost")],
4094 );
4095 assert!(svc.describe_db_parameter_groups(&req).is_err());
4096 }
4097
4098 #[test]
4099 fn describe_db_subnet_groups_unknown_errors() {
4100 let svc = make_service();
4101 let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4102 assert!(svc.describe_db_subnet_groups(&req).is_err());
4103 }
4104}