use super::*;
impl ElastiCacheService {
pub(super) async fn create_cache_cluster(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let cache_cluster_id = required_query_param(request, "CacheClusterId")?;
let engine =
optional_query_param(request, "Engine").unwrap_or_else(|| ENGINE_REDIS.to_string());
validate_engine(&engine)?;
let default_version = match engine.as_str() {
ENGINE_VALKEY => "8.0",
ENGINE_MEMCACHED => "1.6.22",
_ => "7.1",
};
let engine_version = optional_query_param(request, "EngineVersion")
.unwrap_or_else(|| default_version.to_string());
let cache_node_type = optional_query_param(request, "CacheNodeType")
.unwrap_or_else(|| "cache.t3.micro".to_string());
let num_cache_nodes = match optional_query_param(request, "NumCacheNodes") {
Some(v) => {
let n = v.parse::<i32>().map_err(|_| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValue",
format!("Invalid value for NumCacheNodes: '{v}'"),
)
})?;
if n < 1 {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValue",
format!("NumCacheNodes must be a positive integer, got {n}"),
));
}
n
}
None => 1,
};
let cache_subnet_group_name = optional_query_param(request, "CacheSubnetGroupName")
.or_else(|| Some("default".to_string()));
let replication_group_id = optional_query_param(request, "ReplicationGroupId");
let auto_minor_version_upgrade = parse_optional_bool(
optional_query_param(request, "AutoMinorVersionUpgrade").as_deref(),
)?
.unwrap_or(true);
let cache_parameter_group_name = optional_query_param(request, "CacheParameterGroupName");
let security_group_ids =
parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
let cache_security_group_names =
parse_query_list_param(request, "CacheSecurityGroupNames", "CacheSecurityGroupName");
let log_delivery_configurations = parse_log_delivery_configs(request);
let transit_encryption_enabled = parse_optional_bool(
optional_query_param(request, "TransitEncryptionEnabled").as_deref(),
)?
.unwrap_or(false);
let at_rest_encryption_enabled = parse_optional_bool(
optional_query_param(request, "AtRestEncryptionEnabled").as_deref(),
)?
.unwrap_or(false);
let auth_token = optional_query_param(request, "AuthToken");
let auth_token_enabled = auth_token.is_some();
let default_port = if engine == ENGINE_MEMCACHED {
11211
} else {
6379
};
let port = optional_query_param(request, "Port")
.and_then(|v| v.parse::<u16>().ok())
.unwrap_or(default_port);
let preferred_maintenance_window =
optional_query_param(request, "PreferredMaintenanceWindow");
let preferred_availability_zones =
parse_query_list_param(request, "PreferredAvailabilityZones", "AvailabilityZone");
let notification_topic_arn = optional_query_param(request, "NotificationTopicArn");
let snapshot_arns = parse_query_list_param(request, "SnapshotArns", "SnapshotArn");
let snapshot_name = optional_query_param(request, "SnapshotName");
let snapshot_retention_limit = optional_query_param(request, "SnapshotRetentionLimit")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let snapshot_window = optional_query_param(request, "SnapshotWindow");
let outpost_mode = optional_query_param(request, "OutpostMode");
let preferred_outpost_arn = optional_query_param(request, "PreferredOutpostArn");
let network_type =
Some(optional_query_param(request, "NetworkType").unwrap_or_else(|| "ipv4".into()));
let ip_discovery =
Some(optional_query_param(request, "IpDiscovery").unwrap_or_else(|| "ipv4".into()));
let az_mode =
Some(optional_query_param(request, "AZMode").unwrap_or_else(|| "single-az".into()));
let kms_key_id = optional_query_param(request, "KmsKeyId");
let transit_encryption_mode = optional_query_param(request, "TransitEncryptionMode");
let data_tiering_enabled =
parse_optional_bool(optional_query_param(request, "DataTieringEnabled").as_deref())?;
let cluster_mode = optional_query_param(request, "ClusterMode");
let preferred_outpost_arns =
parse_query_list_param(request, "PreferredOutpostArns", "PreferredOutpostArn");
let tags = parse_tags(request)?;
let (preferred_availability_zone, arn, rdb_path) = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
if !state.begin_cache_cluster_creation(&cache_cluster_id) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"CacheClusterAlreadyExists",
format!("CacheCluster {cache_cluster_id} already exists."),
));
}
if let Some(ref subnet_group_name) = cache_subnet_group_name {
if !state.subnet_groups.contains_key(subnet_group_name) {
state.cancel_cache_cluster_creation(&cache_cluster_id);
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"CacheSubnetGroupNotFoundFault",
format!("Cache subnet group {subnet_group_name} not found."),
));
}
}
if let Some(ref group_id) = replication_group_id {
if engine == ENGINE_MEMCACHED {
state.cancel_cache_cluster_creation(&cache_cluster_id);
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValue",
"Replication groups are not supported for the memcached engine."
.to_string(),
));
}
if !state.replication_groups.contains_key(group_id) {
state.cancel_cache_cluster_creation(&cache_cluster_id);
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ReplicationGroupNotFoundFault",
format!("ReplicationGroup {group_id} not found."),
));
}
}
let rdb_path = snapshot_name
.as_ref()
.and_then(|snap_name| state.snapshots.get(snap_name))
.and_then(|snap| snap.rdb_path.clone());
let preferred_availability_zone =
optional_query_param(request, "PreferredAvailabilityZone")
.unwrap_or_else(|| format!("{}a", state.region));
let arn = format!(
"arn:aws:elasticache:{}:{}:cluster:{}",
state.region, state.account_id, cache_cluster_id
);
(preferred_availability_zone, arn, rdb_path)
};
let running = if let Some(runtime) = self.runtime.as_ref() {
let runtime_result = if engine == ENGINE_MEMCACHED {
runtime.ensure_memcached(&cache_cluster_id).await
} else {
runtime
.ensure_redis(&cache_cluster_id, rdb_path.as_deref())
.await
};
match runtime_result {
Ok(r) => r,
Err(e) => {
self.state
.write()
.get_or_create(&request.account_id)
.cancel_cache_cluster_creation(&cache_cluster_id);
return Err(runtime_error_to_service_error(e));
}
}
} else {
crate::runtime::RunningCacheContainer {
container_id: String::new(),
host_port: 0,
}
};
let cluster = CacheCluster {
cache_cluster_id: cache_cluster_id.clone(),
cache_node_type,
engine,
engine_version,
cache_cluster_status: "available".to_string(),
num_cache_nodes,
preferred_availability_zone,
cache_subnet_group_name,
auto_minor_version_upgrade,
arn,
created_at: chrono::Utc::now().to_rfc3339(),
endpoint_address: "127.0.0.1".to_string(),
endpoint_port: running.host_port,
container_id: running.container_id,
host_port: running.host_port,
replication_group_id,
cache_parameter_group_name: cache_parameter_group_name.clone(),
security_group_ids,
log_delivery_configurations,
transit_encryption_enabled,
at_rest_encryption_enabled,
auth_token_enabled,
port,
preferred_maintenance_window,
preferred_availability_zones,
notification_topic_arn,
cache_security_group_names,
snapshot_arns,
snapshot_name,
snapshot_retention_limit,
snapshot_window,
outpost_mode,
preferred_outpost_arn,
network_type,
ip_discovery,
az_mode,
auth_token,
kms_key_id,
transit_encryption_mode,
data_tiering_enabled,
cluster_mode,
preferred_outpost_arns,
};
let xml = cache_cluster_xml(&cluster, true);
{
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let cluster_arn = cluster.arn.clone();
state.finish_cache_cluster_creation(cluster.clone());
state.tags.entry(cluster_arn.clone()).or_default();
if !tags.is_empty() {
merge_tags(state.tags.entry(cluster_arn).or_default(), &tags);
}
if let Some(ref group_id) = cluster.replication_group_id {
add_cluster_to_replication_group(state, group_id, &cluster.cache_cluster_id);
}
}
if let Some(ref param_group) = cache_parameter_group_name {
self.apply_parameters_for_group(&request.account_id, param_group)
.await;
}
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"CreateCacheCluster",
ELASTICACHE_NS,
&format!("<CacheCluster>{xml}</CacheCluster>"),
&request.request_id,
),
))
}
pub(super) fn describe_cache_clusters(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let cache_cluster_id = optional_query_param(request, "CacheClusterId");
let show_cache_node_info =
parse_optional_bool(optional_query_param(request, "ShowCacheNodeInfo").as_deref())?
.unwrap_or(false);
let max_records = optional_usize_param(request, "MaxRecords")?;
let marker = optional_query_param(request, "Marker");
let accounts = self.state.read();
let empty = ElastiCacheState::new(&request.account_id, &request.region);
let state = accounts.get(&request.account_id).unwrap_or(&empty);
let clusters: Vec<&CacheCluster> = if let Some(ref cluster_id) = cache_cluster_id {
match state.cache_clusters.get(cluster_id) {
Some(cluster) => vec![cluster],
None => {
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"CacheClusterNotFound",
format!("CacheCluster {cluster_id} not found."),
));
}
}
} else {
let mut clusters: Vec<&CacheCluster> = state.cache_clusters.values().collect();
clusters.sort_by(|a, b| a.cache_cluster_id.cmp(&b.cache_cluster_id));
clusters
};
let (page, next_marker) = paginate(&clusters, marker.as_deref(), max_records);
let members_xml: String = page
.iter()
.map(|cluster| {
format!(
"<CacheCluster>{}</CacheCluster>",
cache_cluster_xml(cluster, show_cache_node_info)
)
})
.collect();
let marker_xml = next_marker
.map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
.unwrap_or_default();
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DescribeCacheClusters",
ELASTICACHE_NS,
&format!("<CacheClusters>{members_xml}</CacheClusters>{marker_xml}"),
&request.request_id,
),
))
}
pub(super) async fn delete_cache_cluster(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let cache_cluster_id = required_query_param(request, "CacheClusterId")?;
let cluster = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let cluster = state
.cache_clusters
.remove(&cache_cluster_id)
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"CacheClusterNotFound",
format!("CacheCluster {cache_cluster_id} not found."),
)
})?;
if let Some(ref group_id) = cluster.replication_group_id {
remove_cluster_from_replication_group(state, group_id, &cluster.cache_cluster_id);
}
state.tags.remove(&cluster.arn);
cluster
};
if let Some(ref runtime) = self.runtime {
runtime.stop_container(&cache_cluster_id).await;
}
let mut deleted_cluster = cluster;
deleted_cluster.cache_cluster_status = "deleting".to_string();
let xml = cache_cluster_xml(&deleted_cluster, true);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"DeleteCacheCluster",
ELASTICACHE_NS,
&format!("<CacheCluster>{xml}</CacheCluster>"),
&request.request_id,
),
))
}
pub(super) fn modify_cache_cluster(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let id = required_query_param(request, "CacheClusterId")?;
let new_node_count = optional_query_param(request, "NumCacheNodes")
.as_deref()
.and_then(|v| v.parse::<i32>().ok());
let new_node_type = optional_query_param(request, "CacheNodeType");
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let cluster = state.cache_clusters.get_mut(&id).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"CacheClusterNotFound",
format!("CacheCluster {id} not found."),
)
})?;
if let Some(n) = new_node_count {
cluster.num_cache_nodes = n;
}
if let Some(t) = new_node_type {
cluster.cache_node_type = t;
}
cluster.cache_cluster_status = "modifying".to_string();
let xml = cache_cluster_xml(cluster, true);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"ModifyCacheCluster",
ELASTICACHE_NS,
&format!("<CacheCluster>{xml}</CacheCluster>"),
&request.request_id,
),
))
}
pub(super) async fn reboot_cache_cluster(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let id = required_query_param(request, "CacheClusterId")?;
let xml = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let cluster = state.cache_clusters.get_mut(&id).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"CacheClusterNotFound",
format!("CacheCluster {id} not found."),
)
})?;
cluster.cache_cluster_status = "rebooting cache cluster nodes".to_string();
cache_cluster_xml(cluster, true)
};
if let Some(runtime) = &self.runtime {
if let Err(error) = runtime.restart_container(&id).await {
tracing::warn!(
cluster_id = %id,
%error,
"RebootCacheCluster: container restart failed, returning rebooting state anyway"
);
} else {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
if let Some(cluster) = state.cache_clusters.get_mut(&id) {
cluster.cache_cluster_status = "available".to_string();
}
}
}
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"RebootCacheCluster",
ELASTICACHE_NS,
&format!("<CacheCluster>{xml}</CacheCluster>"),
&request.request_id,
),
))
}
}