use super::*;
impl RdsService {
pub(super) async fn create_db_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
let db_instance_class = required_query_param(request, "DBInstanceClass")?;
let engine = required_query_param(request, "Engine")?;
let allocated_storage = optional_i32_param(request, "AllocatedStorage")?
.filter(|v| *v > 0)
.unwrap_or(20);
let master_username =
optional_query_param(request, "MasterUsername").unwrap_or_else(|| "admin".to_string());
let master_user_password = optional_query_param(request, "MasterUserPassword")
.unwrap_or_else(|| "Password1!".to_string());
let db_name = optional_query_param(request, "DBName");
let engine_version =
optional_query_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
let publicly_accessible =
parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?
.unwrap_or(true);
let deletion_protection =
parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?
.unwrap_or(false);
let port = optional_i32_param(request, "Port")?
.unwrap_or_else(|| default_port_for_engine(&engine));
let vpc_security_group_ids = parse_vpc_security_group_ids(request);
let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName")
.or_else(|| Some(default_parameter_group(&engine, &engine_version)));
let backup_retention_period =
optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow")
.unwrap_or_else(|| "03:00-04:00".to_string());
let option_group_name = optional_query_param(request, "OptionGroupName");
let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?
.unwrap_or(false);
let availability_zone = optional_query_param(request, "AvailabilityZone");
let storage_type = optional_query_param(request, "StorageType");
let storage_encrypted =
parse_optional_bool(optional_query_param(request, "StorageEncrypted").as_deref())?
.unwrap_or(false);
let kms_key_id = optional_query_param(request, "KmsKeyId");
let iam_database_authentication_enabled = parse_optional_bool(
optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
)?
.unwrap_or(false);
let iops = optional_i32_param(request, "Iops")?;
let monitoring_interval = optional_i32_param(request, "MonitoringInterval")?;
let monitoring_role_arn = optional_query_param(request, "MonitoringRoleArn");
let performance_insights_enabled = parse_optional_bool(
optional_query_param(request, "EnablePerformanceInsights").as_deref(),
)?
.unwrap_or(false);
let performance_insights_kms_key_id =
optional_query_param(request, "PerformanceInsightsKMSKeyId");
let performance_insights_retention_period =
optional_i32_param(request, "PerformanceInsightsRetentionPeriod")?;
let enabled_cloudwatch_logs_exports =
parse_cloudwatch_logs_exports(request, "EnableCloudwatchLogsExports");
let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
let network_type = optional_query_param(request, "NetworkType");
let character_set_name = optional_query_param(request, "CharacterSetName");
let auto_minor_version_upgrade = parse_optional_bool(
optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
)?;
let copy_tags_to_snapshot =
parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
let db_cluster_identifier = optional_query_param(request, "DBClusterIdentifier");
let request_tags = parse_tags(request)?;
validate_create_request(
&db_instance_identifier,
allocated_storage,
&db_instance_class,
&engine,
&engine_version,
port,
)?;
{
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
if !state.begin_instance_creation(&db_instance_identifier) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"DBInstanceAlreadyExists",
format!("DBInstance {} already exists.", db_instance_identifier),
));
}
if let Some(ref pg_name) = db_parameter_group_name {
if !state.parameter_groups.contains_key(pg_name) {
state.cancel_instance_creation(&db_instance_identifier);
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBParameterGroupNotFound",
format!("DBParameterGroup {} not found.", pg_name),
));
}
}
}
let runtime = self.require_runtime()?.clone();
let logical_db_name = db_name
.clone()
.unwrap_or_else(|| default_db_name(&engine).to_string());
let created_at = Utc::now();
let instance = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let placeholder = DbInstance {
db_instance_identifier: db_instance_identifier.clone(),
db_instance_arn: state.db_instance_arn(&db_instance_identifier),
db_instance_class: db_instance_class.clone(),
engine: engine.clone(),
engine_version: engine_version.clone(),
db_instance_status: "creating".to_string(),
master_username: master_username.clone(),
db_name: db_name.clone(),
endpoint_address: String::new(),
port: 0,
allocated_storage,
publicly_accessible,
deletion_protection,
created_at,
dbi_resource_id: state.next_dbi_resource_id(),
master_user_password: master_user_password.clone(),
container_id: String::new(),
host_port: 0,
tags: request_tags,
read_replica_source_db_instance_identifier: None,
read_replica_db_instance_identifiers: Vec::new(),
vpc_security_group_ids,
db_parameter_group_name,
backup_retention_period,
preferred_backup_window,
preferred_maintenance_window: None,
latest_restorable_time: if backup_retention_period > 0 {
Some(created_at)
} else {
None
},
option_group_name,
multi_az,
pending_modified_values: None,
availability_zone,
storage_type,
storage_encrypted,
kms_key_id,
iam_database_authentication_enabled,
iops,
monitoring_interval,
monitoring_role_arn,
performance_insights_enabled,
performance_insights_kms_key_id,
performance_insights_retention_period,
enabled_cloudwatch_logs_exports,
ca_certificate_identifier,
network_type,
character_set_name,
auto_minor_version_upgrade,
copy_tags_to_snapshot,
master_user_secret_arn: None,
master_user_secret_kms_key_id: None,
license_model: None,
max_allocated_storage: None,
multi_tenant: None,
storage_throughput: None,
tde_credential_arn: None,
delete_automated_backups: None,
db_security_groups: Vec::new(),
domain: None,
domain_fqdn: None,
domain_ou: None,
domain_iam_role_name: None,
domain_auth_secret_arn: None,
domain_dns_ips: Vec::new(),
db_cluster_identifier: db_cluster_identifier.clone(),
};
state.finish_instance_creation(placeholder.clone());
placeholder
};
let instance_arn = instance.db_instance_arn.clone();
self.emit_event(
RdsSourceType::DbInstance,
&db_instance_identifier,
&instance_arn,
"RDS-EVENT-0005",
&["creation"],
"DB instance created",
);
{
let state_handle = self.state.clone();
let delivery_bus = self.delivery_bus.clone();
let runtime = runtime.clone();
let id = db_instance_identifier.clone();
let engine = engine.clone();
let engine_version = engine_version.clone();
let master_username = master_username.clone();
let master_user_password = master_user_password.clone();
let logical_db_name_task = logical_db_name.clone();
let account_id = request.account_id.clone();
let region = request.region.clone();
let arn = instance_arn.clone();
let snapshot_store = self.snapshot_store.clone();
let snapshot_lock = self.snapshot_lock.clone();
let cluster_id_for_attach = db_cluster_identifier.clone();
let instance_tags = instance.tags.clone();
tokio::spawn(async move {
match runtime
.ensure_postgres(
&id,
&engine,
&engine_version,
&master_username,
&master_user_password,
&logical_db_name_task,
&account_id,
®ion,
&instance_tags,
)
.await
{
Ok(running) => {
let pending_dump = if let Some(ref cid) = cluster_id_for_attach {
let mut accounts = state_handle.write();
let state = accounts.get_or_create(&account_id);
state
.extras
.get_mut("clusters")
.and_then(|m| m.get_mut(cid))
.and_then(|entry| entry.as_object_mut())
.and_then(|obj| obj.remove("PendingRestoreDumpB64"))
.and_then(|v| v.as_str().map(str::to_string))
.and_then(|b64| {
use base64::Engine;
base64::engine::general_purpose::STANDARD
.decode(b64.as_bytes())
.ok()
})
} else {
None
};
if let Some(dump) = pending_dump {
if let Err(error) = runtime
.restore_database(
&id,
&engine,
&master_username,
&master_user_password,
&logical_db_name_task,
&dump,
)
.await
{
tracing::error!(%error, db_instance_identifier=%id, "cluster restore dump replay failed");
}
}
let instance_present = {
let mut accounts = state_handle.write();
let state = accounts.get_or_create(&account_id);
if let Some(inst) = state.instances.get_mut(&id) {
inst.db_instance_status = "available".to_string();
inst.endpoint_address = running.endpoint_address.clone();
inst.port = i32::from(running.endpoint_port);
inst.host_port = running.host_port;
inst.container_id = running.container_id;
if let Some(ref cid) = cluster_id_for_attach {
attach_cluster_member(state, cid, &id);
}
true
} else {
false
}
};
if !instance_present {
tracing::info!(
db_instance_identifier = %id,
"instance deleted during create; reaping orphaned backing container",
);
runtime.stop_container(&id).await;
save_snapshot_static(
state_handle.clone(),
snapshot_store.clone(),
snapshot_lock.clone(),
)
.await;
return;
}
save_snapshot_static(
state_handle.clone(),
snapshot_store.clone(),
snapshot_lock.clone(),
)
.await;
}
Err(error) => {
tracing::error!(%error, db_instance_identifier=%id, "create_db_instance background task failed");
{
let mut accounts = state_handle.write();
let state = accounts.get_or_create(&account_id);
state.instances.remove(&id);
}
save_snapshot_static(
state_handle.clone(),
snapshot_store.clone(),
snapshot_lock.clone(),
)
.await;
emit_event_static(
delivery_bus.as_ref(),
RdsSourceType::DbInstance,
&id,
&arn,
"RDS-EVENT-0058",
&["failure"],
&format!("DB instance failed to create: {}", error),
);
}
}
});
}
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"CreateDBInstance",
RDS_NS,
&format!(
"<DBInstance>{}</DBInstance>",
db_instance_xml(&instance, None)
),
&request.request_id,
),
))
}
pub(super) async fn delete_db_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
let skip_final_snapshot =
parse_optional_bool(optional_query_param(request, "SkipFinalSnapshot").as_deref())?
.unwrap_or(false);
let final_db_snapshot_identifier =
optional_query_param(request, "FinalDBSnapshotIdentifier");
if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidDBInstanceState",
"FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
));
}
if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidDBInstanceState",
"FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
));
}
{
let accounts = self.state.read();
let empty = RdsState::new(&request.account_id, &request.region);
let state = accounts.get(&request.account_id).unwrap_or(&empty);
if let Some(instance) = state.instances.get(&db_instance_identifier) {
if instance.deletion_protection {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidDBInstanceState",
format!(
"DBInstance {} cannot be deleted because deletion protection is enabled.",
db_instance_identifier
),
));
}
} else {
return Err(db_instance_not_found(&db_instance_identifier));
}
}
if let Some(ref snapshot_id) = final_db_snapshot_identifier {
self.create_final_db_snapshot(
&db_instance_identifier,
snapshot_id,
&request.account_id,
&request.region,
)
.await?;
}
let instance = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let instance = state
.instances
.remove(&db_instance_identifier)
.ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
if let Some(source) = state.instances.get_mut(source_id) {
source
.read_replica_db_instance_identifiers
.retain(|id| id != &db_instance_identifier);
}
}
for replica_id in &instance.read_replica_db_instance_identifiers {
if let Some(replica) = state.instances.get_mut(replica_id) {
replica.read_replica_source_db_instance_identifier = None;
}
}
instance
};
if let Some(runtime) = &self.runtime {
runtime.stop_container(&db_instance_identifier).await;
runtime
.remove_data_volume(&request.account_id, &db_instance_identifier)
.await;
}
self.emit_event(
RdsSourceType::DbInstance,
&db_instance_identifier,
&instance.db_instance_arn,
"RDS-EVENT-0003",
&["deletion"],
"DB instance deleted",
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DeleteDBInstance",
RDS_NS,
&format!(
"<DBInstance>{}</DBInstance>",
db_instance_xml(&instance, Some("deleting"))
),
&request.request_id,
),
))
}
pub(super) fn modify_db_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
let apply_immediately =
parse_optional_bool(optional_query_param(request, "ApplyImmediately").as_deref())?;
let deletion_protection =
parse_optional_bool(optional_query_param(request, "DeletionProtection").as_deref())?;
let backup_retention_period =
parse_optional_i32(optional_query_param(request, "BackupRetentionPeriod").as_deref())?;
let preferred_backup_window = optional_query_param(request, "PreferredBackupWindow");
let preferred_maintenance_window =
optional_query_param(request, "PreferredMaintenanceWindow");
let db_parameter_group_name = optional_query_param(request, "DBParameterGroupName");
let master_user_secret_kms_key_id =
optional_query_param(request, "MasterUserSecretKmsKeyId");
let ca_certificate_identifier = optional_query_param(request, "CACertificateIdentifier");
let monitoring_interval =
parse_optional_i32(optional_query_param(request, "MonitoringInterval").as_deref())?;
let option_group_name = optional_query_param(request, "OptionGroupName");
let auto_minor_version_upgrade = parse_optional_bool(
optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
)?;
let copy_tags_to_snapshot =
parse_optional_bool(optional_query_param(request, "CopyTagsToSnapshot").as_deref())?;
let delete_automated_backups = parse_optional_bool(
optional_query_param(request, "DeleteAutomatedBackups").as_deref(),
)?;
let enable_iam_db_auth = parse_optional_bool(
optional_query_param(request, "EnableIAMDatabaseAuthentication").as_deref(),
)?;
let max_allocated_storage =
parse_optional_i32(optional_query_param(request, "MaxAllocatedStorage").as_deref())?;
let network_type = optional_query_param(request, "NetworkType");
let domain = optional_query_param(request, "Domain");
let domain_fqdn = optional_query_param(request, "DomainFqdn");
let domain_ou = optional_query_param(request, "DomainOu");
let domain_iam_role_name = optional_query_param(request, "DomainIAMRoleName");
let domain_auth_secret_arn = optional_query_param(request, "DomainAuthSecretArn");
let domain_dns_ips = {
let v = parse_string_member_list(request, "DomainDnsIps");
if v.is_empty() {
None
} else {
Some(v)
}
};
let disable_domain =
parse_optional_bool(optional_query_param(request, "DisableDomain").as_deref())?;
let rotate_master_user_password = parse_optional_bool(
optional_query_param(request, "RotateMasterUserPassword").as_deref(),
)?;
let db_instance_class = optional_query_param(request, "DBInstanceClass");
let master_user_password = optional_query_param(request, "MasterUserPassword");
let engine_version = optional_query_param(request, "EngineVersion");
let allocated_storage =
parse_optional_i32(optional_query_param(request, "AllocatedStorage").as_deref())?;
let multi_az = parse_optional_bool(optional_query_param(request, "MultiAZ").as_deref())?;
let iops = parse_optional_i32(optional_query_param(request, "Iops").as_deref())?;
let storage_type = optional_query_param(request, "StorageType");
let storage_throughput =
parse_optional_i32(optional_query_param(request, "StorageThroughput").as_deref())?;
let performance_insights_enabled = parse_optional_bool(
optional_query_param(request, "EnablePerformanceInsights").as_deref(),
)?;
let license_model = optional_query_param(request, "LicenseModel");
let multi_tenant =
parse_optional_bool(optional_query_param(request, "MultiTenant").as_deref())?;
let publicly_accessible =
parse_optional_bool(optional_query_param(request, "PubliclyAccessible").as_deref())?;
let tde_credential_arn = optional_query_param(request, "TdeCredentialArn");
let db_port_number =
parse_optional_i32(optional_query_param(request, "DBPortNumber").as_deref())?;
let cloudwatch_enable = collect_cloudwatch_log_types(request, "EnableLogTypes");
let cloudwatch_disable = collect_cloudwatch_log_types(request, "DisableLogTypes");
let cloudwatch_changed = !cloudwatch_enable.is_empty() || !cloudwatch_disable.is_empty();
let vpc_security_group_ids = {
let mut ids = Vec::new();
for index in 1.. {
let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
match optional_query_param(request, &sg_id_name) {
Some(sg_id) => ids.push(sg_id),
None => break,
}
}
if ids.is_empty() {
None
} else {
Some(ids)
}
};
let db_security_groups = {
let mut ids = Vec::new();
for index in 1.. {
let key = format!("DBSecurityGroups.DBSecurityGroupName.{index}");
match optional_query_param(request, &key) {
Some(name) => ids.push(name),
None => break,
}
}
if ids.is_empty() {
None
} else {
Some(ids)
}
};
if let Some(ref class) = db_instance_class {
validate_db_instance_class(class)?;
}
let any_mutable_field = db_instance_class.is_some()
|| deletion_protection.is_some()
|| vpc_security_group_ids.is_some()
|| db_security_groups.is_some()
|| master_user_password.is_some()
|| backup_retention_period.is_some()
|| preferred_backup_window.is_some()
|| preferred_maintenance_window.is_some()
|| engine_version.is_some()
|| allocated_storage.is_some()
|| db_parameter_group_name.is_some()
|| multi_az.is_some()
|| iops.is_some()
|| storage_type.is_some()
|| storage_throughput.is_some()
|| master_user_secret_kms_key_id.is_some()
|| ca_certificate_identifier.is_some()
|| monitoring_interval.is_some()
|| performance_insights_enabled.is_some()
|| cloudwatch_changed
|| option_group_name.is_some()
|| auto_minor_version_upgrade.is_some()
|| copy_tags_to_snapshot.is_some()
|| delete_automated_backups.is_some()
|| enable_iam_db_auth.is_some()
|| max_allocated_storage.is_some()
|| network_type.is_some()
|| license_model.is_some()
|| multi_tenant.is_some()
|| publicly_accessible.is_some()
|| tde_credential_arn.is_some()
|| db_port_number.is_some()
|| domain.is_some()
|| domain_fqdn.is_some()
|| domain_ou.is_some()
|| domain_iam_role_name.is_some()
|| domain_auth_secret_arn.is_some()
|| domain_dns_ips.is_some()
|| disable_domain.is_some()
|| rotate_master_user_password.is_some();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let instance = state
.instances
.get_mut(&db_instance_identifier)
.ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
let _ = any_mutable_field;
if let Some(deletion_protection) = deletion_protection {
instance.deletion_protection = deletion_protection;
}
if let Some(security_group_ids) = vpc_security_group_ids {
instance.vpc_security_group_ids = security_group_ids;
}
if let Some(sg_names) = db_security_groups {
instance.db_security_groups = sg_names;
}
if let Some(ca_id) = ca_certificate_identifier {
instance.ca_certificate_identifier = Some(ca_id);
}
if let Some(kms_key) = master_user_secret_kms_key_id {
instance.master_user_secret_kms_key_id = Some(kms_key);
}
if let Some(name) = option_group_name {
instance.option_group_name = Some(name);
}
if let Some(b) = auto_minor_version_upgrade {
instance.auto_minor_version_upgrade = Some(b);
}
if let Some(b) = copy_tags_to_snapshot {
instance.copy_tags_to_snapshot = Some(b);
}
if let Some(b) = delete_automated_backups {
instance.delete_automated_backups = Some(b);
}
if let Some(b) = enable_iam_db_auth {
instance.iam_database_authentication_enabled = b;
}
if let Some(n) = max_allocated_storage {
instance.max_allocated_storage = Some(n);
}
if let Some(nt) = network_type {
instance.network_type = Some(nt);
}
if disable_domain == Some(true) {
instance.domain = None;
instance.domain_fqdn = None;
instance.domain_ou = None;
instance.domain_iam_role_name = None;
instance.domain_auth_secret_arn = None;
instance.domain_dns_ips.clear();
} else {
if let Some(v) = domain {
instance.domain = Some(v);
}
if let Some(v) = domain_fqdn {
instance.domain_fqdn = Some(v);
}
if let Some(v) = domain_ou {
instance.domain_ou = Some(v);
}
if let Some(v) = domain_iam_role_name {
instance.domain_iam_role_name = Some(v);
}
if let Some(v) = domain_auth_secret_arn {
instance.domain_auth_secret_arn = Some(v);
}
if let Some(v) = domain_dns_ips {
instance.domain_dns_ips = v;
}
}
if cloudwatch_changed {
let mut current: Vec<String> = instance.enabled_cloudwatch_logs_exports.clone();
current.retain(|t| !cloudwatch_disable.contains(t));
for t in &cloudwatch_enable {
if !current.contains(t) {
current.push(t.clone());
}
}
instance.enabled_cloudwatch_logs_exports = current;
}
if rotate_master_user_password == Some(true) {
instance.master_user_password = format!("rotated-{}", uuid::Uuid::new_v4().simple());
}
let immediate = apply_immediately != Some(false);
if immediate {
if let Some(class) = db_instance_class {
instance.db_instance_class = class;
}
if let Some(pwd) = master_user_password {
instance.master_user_password = pwd;
}
if let Some(version) = engine_version {
instance.engine_version = version;
}
if let Some(storage) = allocated_storage {
instance.allocated_storage = storage;
}
if let Some(name) = db_parameter_group_name {
instance.db_parameter_group_name = Some(name);
}
if let Some(az) = multi_az {
instance.multi_az = az;
}
if let Some(iops_val) = iops {
instance.iops = Some(iops_val);
}
if let Some(stype) = storage_type {
instance.storage_type = Some(stype);
}
if let Some(t) = storage_throughput {
instance.storage_throughput = Some(t);
}
if let Some(pi) = performance_insights_enabled {
instance.performance_insights_enabled = pi;
}
if let Some(lm) = license_model {
instance.license_model = Some(lm);
}
if let Some(b) = multi_tenant {
instance.multi_tenant = Some(b);
}
if let Some(b) = publicly_accessible {
instance.publicly_accessible = b;
}
if let Some(arn) = tde_credential_arn {
instance.tde_credential_arn = Some(arn);
}
if let Some(p) = db_port_number {
instance.port = p;
}
if let Some(retention) = backup_retention_period {
instance.backup_retention_period = retention;
}
if let Some(window) = preferred_backup_window {
instance.preferred_backup_window = window;
}
if let Some(window) = preferred_maintenance_window {
instance.preferred_maintenance_window = Some(window);
}
if let Some(interval) = monitoring_interval {
instance.monitoring_interval = Some(interval);
}
} else {
let any_deferred = db_instance_class.is_some()
|| master_user_password.is_some()
|| engine_version.is_some()
|| allocated_storage.is_some()
|| db_parameter_group_name.is_some()
|| multi_az.is_some()
|| iops.is_some()
|| storage_type.is_some()
|| storage_throughput.is_some()
|| performance_insights_enabled.is_some()
|| license_model.is_some()
|| multi_tenant.is_some()
|| publicly_accessible.is_some()
|| tde_credential_arn.is_some()
|| db_port_number.is_some()
|| backup_retention_period.is_some()
|| preferred_backup_window.is_some()
|| preferred_maintenance_window.is_some()
|| monitoring_interval.is_some();
if any_deferred {
let pending = instance
.pending_modified_values
.get_or_insert(Default::default());
if let Some(class) = db_instance_class {
pending.db_instance_class = Some(class);
}
if let Some(pwd) = master_user_password {
pending.master_user_password = Some(pwd);
}
if let Some(version) = engine_version {
pending.engine_version = Some(version);
}
if let Some(storage) = allocated_storage {
pending.allocated_storage = Some(storage);
}
if let Some(name) = db_parameter_group_name {
pending.db_parameter_group_name = Some(name);
}
if let Some(az) = multi_az {
pending.multi_az = Some(az);
}
if let Some(iops_val) = iops {
pending.iops = Some(iops_val);
}
if let Some(stype) = storage_type {
pending.storage_type = Some(stype);
}
if let Some(t) = storage_throughput {
pending.storage_throughput = Some(t);
}
if let Some(pi) = performance_insights_enabled {
pending.performance_insights_enabled = Some(pi);
}
if let Some(lm) = license_model {
pending.license_model = Some(lm);
}
if let Some(b) = multi_tenant {
pending.multi_tenant = Some(b);
}
if let Some(b) = publicly_accessible {
pending.publicly_accessible = Some(b);
}
if let Some(arn) = tde_credential_arn {
pending.tde_credential_arn = Some(arn);
}
if let Some(p) = db_port_number {
pending.port = Some(p);
}
if let Some(retention) = backup_retention_period {
pending.backup_retention_period = Some(retention);
}
if let Some(window) = preferred_backup_window {
pending.preferred_backup_window = Some(window);
}
if let Some(window) = preferred_maintenance_window {
pending.preferred_maintenance_window = Some(window);
}
if let Some(interval) = monitoring_interval {
pending.monitoring_interval = Some(interval);
}
}
}
let instance_arn = instance.db_instance_arn.clone();
let xml = query_response_xml(
"ModifyDBInstance",
RDS_NS,
&format!(
"<DBInstance>{}</DBInstance>",
db_instance_xml(instance, Some("modifying"))
),
&request.request_id,
);
drop(accounts);
self.emit_event(
RdsSourceType::DbInstance,
&db_instance_identifier,
&instance_arn,
"RDS-EVENT-0014",
&["configuration change"],
"DB instance was modified",
);
Ok(AwsResponse::xml(StatusCode::OK, xml))
}
pub(super) async fn reboot_db_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
let force_failover =
parse_optional_bool(optional_query_param(request, "ForceFailover").as_deref())?;
if force_failover == Some(true) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidDBInstanceState",
"ForceFailover is not supported for single-instance PostgreSQL DB instances.",
));
}
let instance = {
let accounts = self.state.read();
let empty = RdsState::new(&request.account_id, &request.region);
let state = accounts.get(&request.account_id).unwrap_or(&empty);
state
.instances
.get(&db_instance_identifier)
.cloned()
.ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
};
let runtime = self.require_runtime()?;
let running = runtime
.restart_container(
&db_instance_identifier,
&instance.engine,
&instance.master_username,
&instance.master_user_password,
instance
.db_name
.as_deref()
.unwrap_or(default_db_name(&instance.engine)),
)
.await
.map_err(runtime_error_to_service_error)?;
let instance = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let instance = state
.instances
.get_mut(&db_instance_identifier)
.ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
instance.host_port = running.host_port;
instance.port = i32::from(running.endpoint_port);
instance.endpoint_address = running.endpoint_address.clone();
if let Some(pending) = instance.pending_modified_values.take() {
apply_pending_to_instance(instance, pending);
}
instance.clone()
};
self.emit_event(
RdsSourceType::DbInstance,
&db_instance_identifier,
&instance.db_instance_arn,
"RDS-EVENT-0006",
&["availability"],
"DB instance restarted",
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"RebootDBInstance",
RDS_NS,
&format!(
"<DBInstance>{}</DBInstance>",
db_instance_xml(&instance, Some("rebooting"))
),
&request.request_id,
),
))
}
pub(super) fn describe_db_instances(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let db_instance_identifier = optional_query_param(request, "DBInstanceIdentifier");
let marker = optional_query_param(request, "Marker");
let max_records = optional_query_param(request, "MaxRecords");
let accounts = self.state.read();
let empty = RdsState::new(&request.account_id, &request.region);
let state = accounts.get(&request.account_id).unwrap_or(&empty);
if let Some(identifier) = db_instance_identifier {
let instance = state
.instances
.get(&identifier)
.cloned()
.ok_or_else(|| db_instance_not_found(&identifier))?;
return Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DescribeDBInstances",
RDS_NS,
&format!(
"<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
db_instance_xml(&instance, None)
),
&request.request_id,
),
));
}
let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
instances.sort_by(|a, b| {
a.created_at
.cmp(&b.created_at)
.then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
});
let paginated = paginate(instances, marker, max_records, |inst| {
&inst.db_instance_identifier
})?;
let marker_xml = paginated
.next_marker
.as_ref()
.map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
.unwrap_or_default();
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DescribeDBInstances",
RDS_NS,
&format!(
"<DBInstances>{}</DBInstances>{}",
paginated
.items
.iter()
.map(|instance| {
format!(
"<DBInstance>{}</DBInstance>",
db_instance_xml(instance, None)
)
})
.collect::<String>(),
marker_xml
),
&request.request_id,
),
))
}
pub(super) fn describe_orderable_db_instance_options(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let engine = optional_query_param(request, "Engine");
let engine_version = optional_query_param(request, "EngineVersion");
let db_instance_class = optional_query_param(request, "DBInstanceClass");
let license_model = optional_query_param(request, "LicenseModel");
let vpc = parse_optional_bool(optional_query_param(request, "Vpc").as_deref())?;
let options = filter_orderable_options(
&default_orderable_options(),
&engine,
&engine_version,
&db_instance_class,
&license_model,
vpc,
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DescribeOrderableDBInstanceOptions",
RDS_NS,
&format!(
"<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
options.iter().map(orderable_option_xml).collect::<String>()
),
&request.request_id,
),
))
}
}