fakecloud-elasticache 0.22.0

Amazon ElastiCache implementation for FakeCloud
Documentation
//! ElastiCache `serverless` family handlers extracted from service.rs
//! by audit-2026-05-19 file-split.

use super::*;

impl ElastiCacheService {
    pub(super) async fn create_serverless_cache(
        &self,
        request: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let serverless_cache_name = required_query_param(request, "ServerlessCacheName")?;
        let engine = required_query_param(request, "Engine")?;
        validate_serverless_engine(&engine)?;

        let description = optional_query_param(request, "Description").unwrap_or_default();
        let major_engine_version = optional_query_param(request, "MajorEngineVersion")
            .unwrap_or_else(|| default_major_engine_version(&engine).to_string());
        let full_engine_version = default_full_engine_version(&engine, &major_engine_version)?;
        let cache_usage_limits = parse_cache_usage_limits(request)?;
        let security_group_ids =
            parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
        let subnet_ids = parse_query_list_param(request, "SubnetIds", "SubnetId");
        let kms_key_id = optional_query_param(request, "KmsKeyId");
        let user_group_id = optional_query_param(request, "UserGroupId");
        let snapshot_retention_limit =
            optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
        let daily_snapshot_time = optional_query_param(request, "DailySnapshotTime");
        let tags = parse_tags(request)?;

        let (arn, endpoint_address) = {
            let mut accounts = self.state.write();
            let state = accounts.get_or_create(&request.account_id);
            if !state.begin_serverless_cache_creation(&serverless_cache_name) {
                return Err(AwsServiceError::aws_error(
                    StatusCode::BAD_REQUEST,
                    "ServerlessCacheAlreadyExistsFault",
                    format!("ServerlessCache {serverless_cache_name} already exists."),
                ));
            }

            if let Some(ref group_id) = user_group_id {
                let user_group_status = match state.user_groups.get(group_id) {
                    Some(user_group) => user_group.status.clone(),
                    None => {
                        state.cancel_serverless_cache_creation(&serverless_cache_name);
                        return Err(AwsServiceError::aws_error(
                            StatusCode::NOT_FOUND,
                            "UserGroupNotFound",
                            format!("User group {group_id} not found."),
                        ));
                    }
                };
                if user_group_status != "active" {
                    state.cancel_serverless_cache_creation(&serverless_cache_name);
                    return Err(AwsServiceError::aws_error(
                        StatusCode::BAD_REQUEST,
                        "InvalidUserGroupState",
                        format!("User group {group_id} is not in active state."),
                    ));
                }
            }

            let arn = format!(
                "arn:aws:elasticache:{}:{}:serverlesscache:{}",
                state.region, state.account_id, serverless_cache_name
            );
            (arn, "127.0.0.1".to_string())
        };

        // The backing container is started off the request path (below); the
        // cache begins with no endpoint port and is filled in once ready, so the
        // response doesn't block on the cold image pull + readiness (which made
        // the AWS CLI hit its 60s read timeout). bug-audit 2026-05-28, 3.2.
        let endpoint = ServerlessCacheEndpoint {
            address: endpoint_address.clone(),
            port: 0,
        };
        let reader_endpoint = ServerlessCacheEndpoint {
            address: endpoint_address,
            port: 0,
        };
        let cache = ServerlessCache {
            serverless_cache_name: serverless_cache_name.clone(),
            description,
            engine,
            major_engine_version,
            full_engine_version,
            status: if self.runtime.is_some() {
                "creating".to_string()
            } else {
                "available".to_string()
            },
            endpoint,
            reader_endpoint,
            arn: arn.clone(),
            created_at: chrono::Utc::now().to_rfc3339(),
            cache_usage_limits,
            security_group_ids,
            subnet_ids,
            kms_key_id,
            user_group_id,
            snapshot_retention_limit,
            daily_snapshot_time,
            container_id: String::new(),
            host_port: 0,
        };

        let xml = serverless_cache_xml(&cache);
        {
            let mut accounts = self.state.write();
            let state = accounts.get_or_create(&request.account_id);
            state.finish_serverless_cache_creation(cache.clone());
            if !tags.is_empty() {
                merge_tags(state.tags.entry(arn).or_default(), &tags);
            }
        }

        // Start the backing container off the request path and flip the cache
        // to "available" (filling in the endpoint ports) once it is ready.
        if let Some(runtime) = self.runtime.clone() {
            let state = self.state.clone();
            let snapshot_store = self.snapshot_store.clone();
            let snapshot_lock = self.snapshot_lock.clone();
            let account_id = request.account_id.clone();
            let name = serverless_cache_name.clone();
            // Reserved `fakecloud-k8s/*` scheduling tags from the create
            // request (ignored on the Docker backend).
            let pod_tags: std::collections::BTreeMap<String, String> =
                tags.iter().cloned().collect();
            tokio::spawn(async move {
                let result = runtime.ensure_redis(&name, None, &pod_tags).await;
                let mut stop_container = false;
                {
                    let mut accounts = state.write();
                    if let Some(s) = accounts.get_mut(&account_id) {
                        match &result {
                            Ok(running) => {
                                if let Some(c) = s.serverless_caches.get_mut(&name) {
                                    c.status = "available".to_string();
                                    c.endpoint.address = running.endpoint_address.clone();
                                    c.endpoint.port = running.endpoint_port;
                                    c.reader_endpoint.address = running.endpoint_address.clone();
                                    c.reader_endpoint.port = running.endpoint_port;
                                    c.host_port = running.host_port;
                                    c.container_id = running.container_id.clone();
                                } else {
                                    // Deleted during startup: the container came
                                    // up but the cache is gone — reap it after
                                    // the lock is released so it isn't orphaned.
                                    stop_container = true;
                                }
                            }
                            Err(error) => {
                                tracing::error!(
                                    %error,
                                    serverless_cache_name = %name,
                                    "failed to start elasticache serverless cache container",
                                );
                                if let Some(c) = s.serverless_caches.get_mut(&name) {
                                    c.status = "create-failed".to_string();
                                }
                            }
                        }
                    } else {
                        stop_container = result.is_ok();
                    }
                }
                if stop_container {
                    runtime.stop_container(&name).await;
                }
                save_snapshot_static(state, snapshot_store, snapshot_lock).await;
            });
        }

        Ok(AwsResponse::xml(
            StatusCode::OK,
            query_response_xml(
                "CreateServerlessCache",
                ELASTICACHE_NS,
                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
                &request.request_id,
            ),
        ))
    }

    pub(super) fn describe_serverless_caches(
        &self,
        request: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let serverless_cache_name = optional_query_param(request, "ServerlessCacheName");
        let max_results = optional_usize_param(request, "MaxResults")?;
        let next_token = optional_query_param(request, "NextToken");

        let accounts = self.state.read();
        let empty = ElastiCacheState::new(&request.account_id, &request.region);
        let state = accounts.get(&request.account_id).unwrap_or(&empty);
        let caches: Vec<&ServerlessCache> = if let Some(ref name) = serverless_cache_name {
            match state.serverless_caches.get(name) {
                Some(cache) => vec![cache],
                None => {
                    return Err(AwsServiceError::aws_error(
                        StatusCode::NOT_FOUND,
                        "ServerlessCacheNotFoundFault",
                        format!("ServerlessCache {name} not found."),
                    ));
                }
            }
        } else {
            let mut caches: Vec<&ServerlessCache> = state.serverless_caches.values().collect();
            caches.sort_by(|a, b| a.serverless_cache_name.cmp(&b.serverless_cache_name));
            caches
        };

        let (page, next_token) = paginate(&caches, next_token.as_deref(), max_results)?;
        let members_xml: String = page
            .iter()
            .map(|cache| format!("<member>{}</member>", serverless_cache_xml(cache)))
            .collect();
        let next_token_xml = next_token
            .map(|token| format!("<NextToken>{}</NextToken>", xml_escape(&token)))
            .unwrap_or_default();

        Ok(AwsResponse::xml(
            StatusCode::OK,
            query_response_xml(
                "DescribeServerlessCaches",
                ELASTICACHE_NS,
                &format!("<ServerlessCaches>{members_xml}</ServerlessCaches>{next_token_xml}"),
                &request.request_id,
            ),
        ))
    }

    pub(super) async fn delete_serverless_cache(
        &self,
        request: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let serverless_cache_name = required_query_param(request, "ServerlessCacheName")?;

        let cache = {
            let mut accounts = self.state.write();
            let state = accounts.get_or_create(&request.account_id);
            let cache = state
                .serverless_caches
                .remove(&serverless_cache_name)
                .ok_or_else(|| {
                    AwsServiceError::aws_error(
                        StatusCode::NOT_FOUND,
                        "ServerlessCacheNotFoundFault",
                        format!("ServerlessCache {serverless_cache_name} not found."),
                    )
                })?;
            state.tags.remove(&cache.arn);
            cache
        };

        if let Some(ref runtime) = self.runtime {
            runtime.stop_container(&serverless_cache_name).await;
        }

        let mut deleted = cache;
        deleted.status = "deleting".to_string();
        let xml = serverless_cache_xml(&deleted);

        Ok(AwsResponse::xml(
            StatusCode::OK,
            query_response_xml(
                "DeleteServerlessCache",
                ELASTICACHE_NS,
                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
                &request.request_id,
            ),
        ))
    }

    pub(super) fn modify_serverless_cache(
        &self,
        request: &AwsRequest,
    ) -> Result<AwsResponse, AwsServiceError> {
        let serverless_cache_name = required_query_param(request, "ServerlessCacheName")?;
        let description = optional_query_param(request, "Description");
        let cache_usage_limits = parse_cache_usage_limits(request)?;
        let security_group_ids =
            parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
        let user_group_id = optional_query_param(request, "UserGroupId");
        let snapshot_retention_limit =
            optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
        let daily_snapshot_time = optional_query_param(request, "DailySnapshotTime");

        let mut accounts = self.state.write();
        let state = accounts.get_or_create(&request.account_id);

        if let Some(ref group_id) = user_group_id {
            let user_group = state.user_groups.get(group_id).ok_or_else(|| {
                AwsServiceError::aws_error(
                    StatusCode::NOT_FOUND,
                    "UserGroupNotFound",
                    format!("User group {group_id} not found."),
                )
            })?;
            if user_group.status != "active" {
                return Err(AwsServiceError::aws_error(
                    StatusCode::BAD_REQUEST,
                    "InvalidUserGroupState",
                    format!("User group {group_id} is not in active state."),
                ));
            }
        }

        let cache = state
            .serverless_caches
            .get_mut(&serverless_cache_name)
            .ok_or_else(|| {
                AwsServiceError::aws_error(
                    StatusCode::NOT_FOUND,
                    "ServerlessCacheNotFoundFault",
                    format!("ServerlessCache {serverless_cache_name} not found."),
                )
            })?;

        if let Some(description) = description {
            cache.description = description;
        }
        if cache_usage_limits.is_some() {
            cache.cache_usage_limits = cache_usage_limits;
        }
        if !security_group_ids.is_empty() {
            cache.security_group_ids = security_group_ids;
        }
        if let Some(user_group_id) = user_group_id {
            cache.user_group_id = Some(user_group_id);
        }
        if let Some(snapshot_retention_limit) = snapshot_retention_limit {
            cache.snapshot_retention_limit = Some(snapshot_retention_limit);
        }
        if let Some(daily_snapshot_time) = daily_snapshot_time {
            cache.daily_snapshot_time = Some(daily_snapshot_time);
        }

        let xml = serverless_cache_xml(cache);
        Ok(AwsResponse::xml(
            StatusCode::OK,
            query_response_xml(
                "ModifyServerlessCache",
                ELASTICACHE_NS,
                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
                &request.request_id,
            ),
        ))
    }
}