Skip to main content

fakecloud_elasticache/
service.rs

1use std::convert::TryFrom;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use http::StatusCode;
6
7use fakecloud_aws::xml::xml_escape;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::runtime::{ElastiCacheRuntime, RuntimeError};
11use crate::state::{
12    default_engine_versions, default_parameters_for_family, CacheCluster, CacheEngineVersion,
13    CacheParameterGroup, CacheSnapshot, CacheSubnetGroup, ElastiCacheState, ElastiCacheUser,
14    ElastiCacheUserGroup, EngineDefaultParameter, GlobalReplicationGroup,
15    GlobalReplicationGroupMember, RecurringCharge, ReplicationGroup, ReservedCacheNode,
16    ReservedCacheNodesOffering, ServerlessCache, ServerlessCacheDataStorage,
17    ServerlessCacheEcpuPerSecond, ServerlessCacheEndpoint, ServerlessCacheSnapshot,
18    ServerlessCacheUsageLimits, SharedElastiCacheState,
19};
20
21const ELASTICACHE_NS: &str = "http://elasticache.amazonaws.com/doc/2015-02-02/";
22
23/// Cache engine wire values. Stored as ``String`` on ``CacheCluster`` etc., but
24/// validated against this list at the wire boundary so a typo can't slip in.
25const ENGINE_REDIS: &str = "redis";
26const ENGINE_VALKEY: &str = "valkey";
27const SUPPORTED_ENGINES: &[&str] = &[ENGINE_REDIS, ENGINE_VALKEY];
28
29fn validate_engine(engine: &str) -> Result<(), AwsServiceError> {
30    if !SUPPORTED_ENGINES.contains(&engine) {
31        return Err(AwsServiceError::aws_error(
32            StatusCode::BAD_REQUEST,
33            "InvalidParameterValue",
34            format!("Invalid value for Engine: {engine}. Supported engines: redis, valkey"),
35        ));
36    }
37    Ok(())
38}
39const SUPPORTED_ACTIONS: &[&str] = &[
40    "AddTagsToResource",
41    "CreateCacheCluster",
42    "CreateGlobalReplicationGroup",
43    "CreateCacheSubnetGroup",
44    "CreateReplicationGroup",
45    "CreateServerlessCache",
46    "CreateServerlessCacheSnapshot",
47    "CreateSnapshot",
48    "CreateUser",
49    "CreateUserGroup",
50    "DecreaseReplicaCount",
51    "DeleteCacheCluster",
52    "DeleteGlobalReplicationGroup",
53    "DeleteCacheSubnetGroup",
54    "DeleteReplicationGroup",
55    "DeleteServerlessCache",
56    "DeleteServerlessCacheSnapshot",
57    "DeleteSnapshot",
58    "DeleteUser",
59    "DeleteUserGroup",
60    "DescribeCacheClusters",
61    "DescribeCacheEngineVersions",
62    "DescribeGlobalReplicationGroups",
63    "DescribeCacheParameterGroups",
64    "DescribeReservedCacheNodes",
65    "DescribeReservedCacheNodesOfferings",
66    "DescribeCacheSubnetGroups",
67    "DescribeEngineDefaultParameters",
68    "DescribeReplicationGroups",
69    "DescribeServerlessCaches",
70    "DescribeServerlessCacheSnapshots",
71    "DescribeSnapshots",
72    "DescribeUserGroups",
73    "DescribeUsers",
74    "DisassociateGlobalReplicationGroup",
75    "FailoverGlobalReplicationGroup",
76    "IncreaseReplicaCount",
77    "ListTagsForResource",
78    "ModifyCacheSubnetGroup",
79    "ModifyGlobalReplicationGroup",
80    "ModifyReplicationGroup",
81    "ModifyServerlessCache",
82    "RemoveTagsFromResource",
83    "TestFailover",
84];
85
86pub struct ElastiCacheService {
87    state: SharedElastiCacheState,
88    runtime: Option<Arc<ElastiCacheRuntime>>,
89}
90
91impl ElastiCacheService {
92    pub fn new(state: SharedElastiCacheState) -> Self {
93        Self {
94            state,
95            runtime: None,
96        }
97    }
98
99    pub fn with_runtime(mut self, runtime: Arc<ElastiCacheRuntime>) -> Self {
100        self.runtime = Some(runtime);
101        self
102    }
103}
104
105#[async_trait]
106impl AwsService for ElastiCacheService {
107    fn service_name(&self) -> &str {
108        "elasticache"
109    }
110
111    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
112        match request.action.as_str() {
113            "AddTagsToResource" => self.add_tags_to_resource(&request),
114            "CreateCacheCluster" => self.create_cache_cluster(&request).await,
115            "CreateGlobalReplicationGroup" => self.create_global_replication_group(&request),
116            "CreateCacheSubnetGroup" => self.create_cache_subnet_group(&request),
117            "CreateReplicationGroup" => self.create_replication_group(&request).await,
118            "CreateServerlessCache" => self.create_serverless_cache(&request).await,
119            "CreateServerlessCacheSnapshot" => self.create_serverless_cache_snapshot(&request),
120            "CreateSnapshot" => self.create_snapshot(&request),
121            "CreateUser" => self.create_user(&request),
122            "CreateUserGroup" => self.create_user_group(&request),
123            "DecreaseReplicaCount" => self.decrease_replica_count(&request),
124            "DeleteCacheCluster" => self.delete_cache_cluster(&request).await,
125            "DeleteGlobalReplicationGroup" => self.delete_global_replication_group(&request),
126            "DeleteCacheSubnetGroup" => self.delete_cache_subnet_group(&request),
127            "DeleteReplicationGroup" => self.delete_replication_group(&request).await,
128            "DeleteServerlessCache" => self.delete_serverless_cache(&request).await,
129            "DeleteServerlessCacheSnapshot" => self.delete_serverless_cache_snapshot(&request),
130            "DeleteSnapshot" => self.delete_snapshot(&request),
131            "DeleteUser" => self.delete_user(&request),
132            "DeleteUserGroup" => self.delete_user_group(&request),
133            "DescribeCacheClusters" => self.describe_cache_clusters(&request),
134            "DescribeCacheEngineVersions" => self.describe_cache_engine_versions(&request),
135            "DescribeGlobalReplicationGroups" => self.describe_global_replication_groups(&request),
136            "DescribeCacheParameterGroups" => self.describe_cache_parameter_groups(&request),
137            "DescribeReservedCacheNodes" => self.describe_reserved_cache_nodes(&request),
138            "DescribeReservedCacheNodesOfferings" => {
139                self.describe_reserved_cache_nodes_offerings(&request)
140            }
141            "DescribeCacheSubnetGroups" => self.describe_cache_subnet_groups(&request),
142            "DescribeEngineDefaultParameters" => self.describe_engine_default_parameters(&request),
143            "DescribeReplicationGroups" => self.describe_replication_groups(&request),
144            "DescribeServerlessCaches" => self.describe_serverless_caches(&request),
145            "DescribeServerlessCacheSnapshots" => {
146                self.describe_serverless_cache_snapshots(&request)
147            }
148            "DescribeSnapshots" => self.describe_snapshots(&request),
149            "DescribeUserGroups" => self.describe_user_groups(&request),
150            "DescribeUsers" => self.describe_users(&request),
151            "DisassociateGlobalReplicationGroup" => {
152                self.disassociate_global_replication_group(&request)
153            }
154            "FailoverGlobalReplicationGroup" => self.failover_global_replication_group(&request),
155            "IncreaseReplicaCount" => self.increase_replica_count(&request),
156            "ListTagsForResource" => self.list_tags_for_resource(&request),
157            "ModifyCacheSubnetGroup" => self.modify_cache_subnet_group(&request),
158            "ModifyGlobalReplicationGroup" => self.modify_global_replication_group(&request),
159            "ModifyReplicationGroup" => self.modify_replication_group(&request),
160            "ModifyServerlessCache" => self.modify_serverless_cache(&request),
161            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
162            "TestFailover" => self.test_failover(&request),
163            _ => Err(AwsServiceError::action_not_implemented(
164                self.service_name(),
165                &request.action,
166            )),
167        }
168    }
169
170    fn supported_actions(&self) -> &[&str] {
171        SUPPORTED_ACTIONS
172    }
173}
174
175impl ElastiCacheService {
176    fn describe_cache_engine_versions(
177        &self,
178        request: &AwsRequest,
179    ) -> Result<AwsResponse, AwsServiceError> {
180        let engine = optional_param(request, "Engine");
181        let engine_version = optional_param(request, "EngineVersion");
182        let family = optional_param(request, "CacheParameterGroupFamily");
183        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
184        let max_records = optional_usize_param(request, "MaxRecords")?;
185        let marker = optional_param(request, "Marker");
186
187        let mut versions = filter_engine_versions(
188            &default_engine_versions(),
189            &engine,
190            &engine_version,
191            &family,
192        );
193
194        if default_only.unwrap_or(false) {
195            // Keep only one version per engine (the latest)
196            let mut seen_engines = std::collections::HashSet::new();
197            versions.retain(|v| seen_engines.insert(v.engine.clone()));
198        }
199
200        let (page, next_marker) = paginate(&versions, marker.as_deref(), max_records);
201
202        let members_xml: String = page.iter().map(engine_version_xml).collect();
203        let marker_xml = next_marker
204            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
205            .unwrap_or_default();
206
207        Ok(AwsResponse::xml(
208            StatusCode::OK,
209            xml_wrap(
210                "DescribeCacheEngineVersions",
211                &format!("<CacheEngineVersions>{members_xml}</CacheEngineVersions>{marker_xml}"),
212                &request.request_id,
213            ),
214        ))
215    }
216
217    fn describe_cache_parameter_groups(
218        &self,
219        request: &AwsRequest,
220    ) -> Result<AwsResponse, AwsServiceError> {
221        let group_name = optional_param(request, "CacheParameterGroupName");
222        let max_records = optional_usize_param(request, "MaxRecords")?;
223        let marker = optional_param(request, "Marker");
224
225        let state = self.state.read();
226
227        let groups: Vec<&CacheParameterGroup> = state
228            .parameter_groups
229            .iter()
230            .filter(|g| {
231                group_name
232                    .as_ref()
233                    .is_none_or(|name| g.cache_parameter_group_name == *name)
234            })
235            .collect();
236
237        if let Some(ref name) = group_name {
238            if groups.is_empty() {
239                return Err(AwsServiceError::aws_error(
240                    StatusCode::NOT_FOUND,
241                    "CacheParameterGroupNotFound",
242                    format!("CacheParameterGroup {name} not found."),
243                ));
244            }
245        }
246
247        let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
248
249        let members_xml: String = page.iter().map(|g| cache_parameter_group_xml(g)).collect();
250        let marker_xml = next_marker
251            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
252            .unwrap_or_default();
253
254        Ok(AwsResponse::xml(
255            StatusCode::OK,
256            xml_wrap(
257                "DescribeCacheParameterGroups",
258                &format!("<CacheParameterGroups>{members_xml}</CacheParameterGroups>{marker_xml}"),
259                &request.request_id,
260            ),
261        ))
262    }
263
264    fn describe_reserved_cache_nodes(
265        &self,
266        request: &AwsRequest,
267    ) -> Result<AwsResponse, AwsServiceError> {
268        let reserved_cache_node_id = optional_param(request, "ReservedCacheNodeId");
269        let reserved_cache_nodes_offering_id =
270            optional_param(request, "ReservedCacheNodesOfferingId");
271        let cache_node_type = optional_param(request, "CacheNodeType");
272        let duration = parse_reserved_duration_filter(optional_param(request, "Duration"))?;
273        let product_description = optional_param(request, "ProductDescription");
274        let offering_type = optional_param(request, "OfferingType");
275        let max_records = optional_usize_param(request, "MaxRecords")?;
276        let marker = optional_param(request, "Marker");
277
278        let state = self.state.read();
279        let mut nodes: Vec<&ReservedCacheNode> = state.reserved_cache_nodes.values().collect();
280        nodes.retain(|node| {
281            reserved_cache_node_id
282                .as_ref()
283                .is_none_or(|expected| node.reserved_cache_node_id == *expected)
284                && reserved_cache_nodes_offering_id
285                    .as_ref()
286                    .is_none_or(|expected| node.reserved_cache_nodes_offering_id == *expected)
287                && cache_node_type
288                    .as_ref()
289                    .is_none_or(|expected| node.cache_node_type == *expected)
290                && duration.is_none_or(|expected| node.duration == expected)
291                && product_description
292                    .as_ref()
293                    .is_none_or(|expected| node.product_description == *expected)
294                && offering_type
295                    .as_ref()
296                    .is_none_or(|expected| node.offering_type == *expected)
297        });
298        nodes.sort_by(|left, right| {
299            left.reserved_cache_node_id
300                .cmp(&right.reserved_cache_node_id)
301        });
302
303        if let Some(ref id) = reserved_cache_node_id {
304            if nodes.is_empty() {
305                return Err(AwsServiceError::aws_error(
306                    StatusCode::NOT_FOUND,
307                    "ReservedCacheNodeNotFoundFault",
308                    format!("ReservedCacheNode not found: {id}"),
309                ));
310            }
311        }
312
313        let (page, next_marker) = paginate(&nodes, marker.as_deref(), max_records);
314        let members_xml: String = page
315            .iter()
316            .map(|node| reserved_cache_node_xml(node))
317            .collect();
318        let marker_xml = next_marker
319            .map(|value| format!("<Marker>{}</Marker>", xml_escape(&value)))
320            .unwrap_or_default();
321
322        Ok(AwsResponse::xml(
323            StatusCode::OK,
324            xml_wrap(
325                "DescribeReservedCacheNodes",
326                &format!("<ReservedCacheNodes>{members_xml}</ReservedCacheNodes>{marker_xml}"),
327                &request.request_id,
328            ),
329        ))
330    }
331
332    fn describe_reserved_cache_nodes_offerings(
333        &self,
334        request: &AwsRequest,
335    ) -> Result<AwsResponse, AwsServiceError> {
336        let reserved_cache_nodes_offering_id =
337            optional_param(request, "ReservedCacheNodesOfferingId");
338        let cache_node_type = optional_param(request, "CacheNodeType");
339        let duration = parse_reserved_duration_filter(optional_param(request, "Duration"))?;
340        let product_description = optional_param(request, "ProductDescription");
341        let offering_type = optional_param(request, "OfferingType");
342        let max_records = optional_usize_param(request, "MaxRecords")?;
343        let marker = optional_param(request, "Marker");
344
345        let state = self.state.read();
346        let mut offerings: Vec<&ReservedCacheNodesOffering> =
347            state.reserved_cache_nodes_offerings.iter().collect();
348        offerings.retain(|offering| {
349            reserved_cache_nodes_offering_id
350                .as_ref()
351                .is_none_or(|expected| offering.reserved_cache_nodes_offering_id == *expected)
352                && cache_node_type
353                    .as_ref()
354                    .is_none_or(|expected| offering.cache_node_type == *expected)
355                && duration.is_none_or(|expected| offering.duration == expected)
356                && product_description
357                    .as_ref()
358                    .is_none_or(|expected| offering.product_description == *expected)
359                && offering_type
360                    .as_ref()
361                    .is_none_or(|expected| offering.offering_type == *expected)
362        });
363        offerings.sort_by(|left, right| {
364            left.reserved_cache_nodes_offering_id
365                .cmp(&right.reserved_cache_nodes_offering_id)
366        });
367
368        if let Some(ref id) = reserved_cache_nodes_offering_id {
369            if offerings.is_empty() {
370                return Err(AwsServiceError::aws_error(
371                    StatusCode::NOT_FOUND,
372                    "ReservedCacheNodesOfferingNotFoundFault",
373                    format!("ReservedCacheNodesOffering not found: {id}"),
374                ));
375            }
376        }
377
378        let (page, next_marker) = paginate(&offerings, marker.as_deref(), max_records);
379        let members_xml: String = page
380            .iter()
381            .map(|offering| reserved_cache_nodes_offering_xml(offering))
382            .collect();
383        let marker_xml = next_marker
384            .map(|value| format!("<Marker>{}</Marker>", xml_escape(&value)))
385            .unwrap_or_default();
386
387        Ok(AwsResponse::xml(
388            StatusCode::OK,
389            xml_wrap(
390                "DescribeReservedCacheNodesOfferings",
391                &format!(
392                    "<ReservedCacheNodesOfferings>{members_xml}</ReservedCacheNodesOfferings>{marker_xml}"
393                ),
394                &request.request_id,
395            ),
396        ))
397    }
398
399    fn describe_engine_default_parameters(
400        &self,
401        request: &AwsRequest,
402    ) -> Result<AwsResponse, AwsServiceError> {
403        let family = required_param(request, "CacheParameterGroupFamily")?;
404        let max_records = optional_usize_param(request, "MaxRecords")?;
405        let marker = optional_param(request, "Marker");
406
407        let params = default_parameters_for_family(&family);
408        let (page, next_marker) = paginate(&params, marker.as_deref(), max_records);
409
410        let params_xml: String = page.iter().map(parameter_xml).collect();
411        let marker_xml = next_marker
412            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
413            .unwrap_or_default();
414
415        Ok(AwsResponse::xml(
416            StatusCode::OK,
417            xml_wrap(
418                "DescribeEngineDefaultParameters",
419                &format!(
420                    "<EngineDefaults>\
421                     <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
422                     <Parameters>{params_xml}</Parameters>\
423                     {marker_xml}\
424                     </EngineDefaults>",
425                    xml_escape(&family),
426                ),
427                &request.request_id,
428            ),
429        ))
430    }
431
432    fn create_cache_subnet_group(
433        &self,
434        request: &AwsRequest,
435    ) -> Result<AwsResponse, AwsServiceError> {
436        let name = required_param(request, "CacheSubnetGroupName")?;
437        let description = required_param(request, "CacheSubnetGroupDescription")?;
438        let subnet_ids = parse_member_list(&request.query_params, "SubnetIds", "SubnetIdentifier");
439
440        if subnet_ids.is_empty() {
441            return Err(AwsServiceError::aws_error(
442                StatusCode::BAD_REQUEST,
443                "InvalidParameterValue",
444                "At least one subnet ID must be specified.".to_string(),
445            ));
446        }
447
448        let mut state = self.state.write();
449
450        if state.subnet_groups.contains_key(&name) {
451            return Err(AwsServiceError::aws_error(
452                StatusCode::BAD_REQUEST,
453                "CacheSubnetGroupAlreadyExists",
454                format!("Cache subnet group {name} already exists."),
455            ));
456        }
457
458        let arn = format!(
459            "arn:aws:elasticache:{}:{}:subnetgroup:{}",
460            state.region, state.account_id, name
461        );
462        let vpc_id = format!(
463            "vpc-{:08x}",
464            name.as_bytes()
465                .iter()
466                .fold(0u32, |acc, &b| acc.wrapping_add(b as u32))
467        );
468
469        let group = CacheSubnetGroup {
470            cache_subnet_group_name: name.clone(),
471            cache_subnet_group_description: description,
472            vpc_id,
473            subnet_ids,
474            arn,
475        };
476
477        let xml = cache_subnet_group_xml(&group, &state.region);
478        state.register_arn(&group.arn);
479        state.subnet_groups.insert(name, group);
480
481        Ok(AwsResponse::xml(
482            StatusCode::OK,
483            xml_wrap(
484                "CreateCacheSubnetGroup",
485                &format!("<CacheSubnetGroup>{xml}</CacheSubnetGroup>"),
486                &request.request_id,
487            ),
488        ))
489    }
490
491    fn describe_cache_subnet_groups(
492        &self,
493        request: &AwsRequest,
494    ) -> Result<AwsResponse, AwsServiceError> {
495        let group_name = optional_param(request, "CacheSubnetGroupName");
496        let max_records = optional_usize_param(request, "MaxRecords")?;
497        let marker = optional_param(request, "Marker");
498
499        let state = self.state.read();
500
501        let groups: Vec<&CacheSubnetGroup> = if let Some(ref name) = group_name {
502            match state.subnet_groups.get(name) {
503                Some(g) => vec![g],
504                None => {
505                    return Err(AwsServiceError::aws_error(
506                        StatusCode::NOT_FOUND,
507                        "CacheSubnetGroupNotFoundFault",
508                        format!("Cache subnet group {name} not found."),
509                    ));
510                }
511            }
512        } else {
513            let mut groups: Vec<&CacheSubnetGroup> = state.subnet_groups.values().collect();
514            groups.sort_by(|a, b| a.cache_subnet_group_name.cmp(&b.cache_subnet_group_name));
515            groups
516        };
517
518        let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
519
520        let members_xml: String = page
521            .iter()
522            .map(|g| {
523                format!(
524                    "<CacheSubnetGroup>{}</CacheSubnetGroup>",
525                    cache_subnet_group_xml(g, &state.region)
526                )
527            })
528            .collect();
529        let marker_xml = next_marker
530            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
531            .unwrap_or_default();
532
533        Ok(AwsResponse::xml(
534            StatusCode::OK,
535            xml_wrap(
536                "DescribeCacheSubnetGroups",
537                &format!("<CacheSubnetGroups>{members_xml}</CacheSubnetGroups>{marker_xml}"),
538                &request.request_id,
539            ),
540        ))
541    }
542
543    fn delete_cache_subnet_group(
544        &self,
545        request: &AwsRequest,
546    ) -> Result<AwsResponse, AwsServiceError> {
547        let name = required_param(request, "CacheSubnetGroupName")?;
548
549        let mut state = self.state.write();
550
551        if name == "default" {
552            return Err(AwsServiceError::aws_error(
553                StatusCode::BAD_REQUEST,
554                "CacheSubnetGroupInUse",
555                "Cannot delete default cache subnet group.".to_string(),
556            ));
557        }
558
559        if let Some(group) = state.subnet_groups.remove(&name) {
560            state.tags.remove(&group.arn);
561        } else {
562            return Err(AwsServiceError::aws_error(
563                StatusCode::NOT_FOUND,
564                "CacheSubnetGroupNotFoundFault",
565                format!("Cache subnet group {name} not found."),
566            ));
567        }
568
569        Ok(AwsResponse::xml(
570            StatusCode::OK,
571            xml_wrap("DeleteCacheSubnetGroup", "", &request.request_id),
572        ))
573    }
574
575    fn modify_cache_subnet_group(
576        &self,
577        request: &AwsRequest,
578    ) -> Result<AwsResponse, AwsServiceError> {
579        let name = required_param(request, "CacheSubnetGroupName")?;
580        let description = optional_param(request, "CacheSubnetGroupDescription");
581        let subnet_ids = parse_member_list(&request.query_params, "SubnetIds", "SubnetIdentifier");
582
583        let mut state = self.state.write();
584        let region = state.region.clone();
585
586        let group = state.subnet_groups.get_mut(&name).ok_or_else(|| {
587            AwsServiceError::aws_error(
588                StatusCode::NOT_FOUND,
589                "CacheSubnetGroupNotFoundFault",
590                format!("Cache subnet group {name} not found."),
591            )
592        })?;
593
594        if let Some(desc) = description {
595            group.cache_subnet_group_description = desc;
596        }
597        if !subnet_ids.is_empty() {
598            group.subnet_ids = subnet_ids;
599        }
600
601        let xml = cache_subnet_group_xml(group, &region);
602
603        Ok(AwsResponse::xml(
604            StatusCode::OK,
605            xml_wrap(
606                "ModifyCacheSubnetGroup",
607                &format!("<CacheSubnetGroup>{xml}</CacheSubnetGroup>"),
608                &request.request_id,
609            ),
610        ))
611    }
612
613    async fn create_cache_cluster(
614        &self,
615        request: &AwsRequest,
616    ) -> Result<AwsResponse, AwsServiceError> {
617        let cache_cluster_id = required_param(request, "CacheClusterId")?;
618        let engine = optional_param(request, "Engine").unwrap_or_else(|| ENGINE_REDIS.to_string());
619        validate_engine(&engine)?;
620
621        let default_version = if engine == ENGINE_VALKEY {
622            "8.0"
623        } else {
624            "7.1"
625        };
626        let engine_version =
627            optional_param(request, "EngineVersion").unwrap_or_else(|| default_version.to_string());
628        let cache_node_type = optional_param(request, "CacheNodeType")
629            .unwrap_or_else(|| "cache.t3.micro".to_string());
630        let num_cache_nodes = match optional_param(request, "NumCacheNodes") {
631            Some(v) => {
632                let n = v.parse::<i32>().map_err(|_| {
633                    AwsServiceError::aws_error(
634                        StatusCode::BAD_REQUEST,
635                        "InvalidParameterValue",
636                        format!("Invalid value for NumCacheNodes: '{v}'"),
637                    )
638                })?;
639                if n < 1 {
640                    return Err(AwsServiceError::aws_error(
641                        StatusCode::BAD_REQUEST,
642                        "InvalidParameterValue",
643                        format!("NumCacheNodes must be a positive integer, got {n}"),
644                    ));
645                }
646                n
647            }
648            None => 1,
649        };
650        let cache_subnet_group_name =
651            optional_param(request, "CacheSubnetGroupName").or_else(|| Some("default".to_string()));
652        let replication_group_id = optional_param(request, "ReplicationGroupId");
653        let auto_minor_version_upgrade =
654            parse_optional_bool(optional_param(request, "AutoMinorVersionUpgrade").as_deref())?
655                .unwrap_or(true);
656
657        let (preferred_availability_zone, arn) = {
658            let mut state = self.state.write();
659            if !state.begin_cache_cluster_creation(&cache_cluster_id) {
660                return Err(AwsServiceError::aws_error(
661                    StatusCode::BAD_REQUEST,
662                    "CacheClusterAlreadyExists",
663                    format!("CacheCluster {cache_cluster_id} already exists."),
664                ));
665            }
666
667            if let Some(ref subnet_group_name) = cache_subnet_group_name {
668                if !state.subnet_groups.contains_key(subnet_group_name) {
669                    state.cancel_cache_cluster_creation(&cache_cluster_id);
670                    return Err(AwsServiceError::aws_error(
671                        StatusCode::NOT_FOUND,
672                        "CacheSubnetGroupNotFoundFault",
673                        format!("Cache subnet group {subnet_group_name} not found."),
674                    ));
675                }
676            }
677
678            if let Some(ref group_id) = replication_group_id {
679                if !state.replication_groups.contains_key(group_id) {
680                    state.cancel_cache_cluster_creation(&cache_cluster_id);
681                    return Err(AwsServiceError::aws_error(
682                        StatusCode::NOT_FOUND,
683                        "ReplicationGroupNotFoundFault",
684                        format!("ReplicationGroup {group_id} not found."),
685                    ));
686                }
687            }
688
689            let preferred_availability_zone = optional_param(request, "PreferredAvailabilityZone")
690                .unwrap_or_else(|| format!("{}a", state.region));
691            let arn = format!(
692                "arn:aws:elasticache:{}:{}:cluster:{}",
693                state.region, state.account_id, cache_cluster_id
694            );
695            (preferred_availability_zone, arn)
696        };
697
698        let runtime = self.runtime.as_ref().ok_or_else(|| {
699            self.state
700                .write()
701                .cancel_cache_cluster_creation(&cache_cluster_id);
702            AwsServiceError::aws_error(
703                StatusCode::SERVICE_UNAVAILABLE,
704                "InvalidParameterValue",
705                "Docker/Podman is required for ElastiCache cache clusters but is not available"
706                    .to_string(),
707            )
708        })?;
709
710        let running = match runtime.ensure_redis(&cache_cluster_id).await {
711            Ok(r) => r,
712            Err(e) => {
713                self.state
714                    .write()
715                    .cancel_cache_cluster_creation(&cache_cluster_id);
716                return Err(runtime_error_to_service_error(e));
717            }
718        };
719
720        let cluster = CacheCluster {
721            cache_cluster_id: cache_cluster_id.clone(),
722            cache_node_type,
723            engine,
724            engine_version,
725            cache_cluster_status: "available".to_string(),
726            num_cache_nodes,
727            preferred_availability_zone,
728            cache_subnet_group_name,
729            auto_minor_version_upgrade,
730            arn,
731            created_at: chrono::Utc::now().to_rfc3339(),
732            endpoint_address: "127.0.0.1".to_string(),
733            endpoint_port: running.host_port,
734            container_id: running.container_id,
735            host_port: running.host_port,
736            replication_group_id,
737        };
738
739        let xml = cache_cluster_xml(&cluster, true);
740        {
741            let mut state = self.state.write();
742            state.finish_cache_cluster_creation(cluster.clone());
743            if let Some(ref group_id) = cluster.replication_group_id {
744                add_cluster_to_replication_group(&mut state, group_id, &cluster.cache_cluster_id);
745            }
746        }
747
748        Ok(AwsResponse::xml(
749            StatusCode::OK,
750            xml_wrap(
751                "CreateCacheCluster",
752                &format!("<CacheCluster>{xml}</CacheCluster>"),
753                &request.request_id,
754            ),
755        ))
756    }
757
758    fn describe_cache_clusters(
759        &self,
760        request: &AwsRequest,
761    ) -> Result<AwsResponse, AwsServiceError> {
762        let cache_cluster_id = optional_param(request, "CacheClusterId");
763        let show_cache_node_info =
764            parse_optional_bool(optional_param(request, "ShowCacheNodeInfo").as_deref())?
765                .unwrap_or(false);
766        let max_records = optional_usize_param(request, "MaxRecords")?;
767        let marker = optional_param(request, "Marker");
768
769        let state = self.state.read();
770        let clusters: Vec<&CacheCluster> = if let Some(ref cluster_id) = cache_cluster_id {
771            match state.cache_clusters.get(cluster_id) {
772                Some(cluster) => vec![cluster],
773                None => {
774                    return Err(AwsServiceError::aws_error(
775                        StatusCode::NOT_FOUND,
776                        "CacheClusterNotFound",
777                        format!("CacheCluster {cluster_id} not found."),
778                    ));
779                }
780            }
781        } else {
782            let mut clusters: Vec<&CacheCluster> = state.cache_clusters.values().collect();
783            clusters.sort_by(|a, b| a.cache_cluster_id.cmp(&b.cache_cluster_id));
784            clusters
785        };
786
787        let (page, next_marker) = paginate(&clusters, marker.as_deref(), max_records);
788        let members_xml: String = page
789            .iter()
790            .map(|cluster| {
791                format!(
792                    "<CacheCluster>{}</CacheCluster>",
793                    cache_cluster_xml(cluster, show_cache_node_info)
794                )
795            })
796            .collect();
797        let marker_xml = next_marker
798            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
799            .unwrap_or_default();
800
801        Ok(AwsResponse::xml(
802            StatusCode::OK,
803            xml_wrap(
804                "DescribeCacheClusters",
805                &format!("<CacheClusters>{members_xml}</CacheClusters>{marker_xml}"),
806                &request.request_id,
807            ),
808        ))
809    }
810
811    async fn delete_cache_cluster(
812        &self,
813        request: &AwsRequest,
814    ) -> Result<AwsResponse, AwsServiceError> {
815        let cache_cluster_id = required_param(request, "CacheClusterId")?;
816
817        let cluster = {
818            let mut state = self.state.write();
819            let cluster = state
820                .cache_clusters
821                .remove(&cache_cluster_id)
822                .ok_or_else(|| {
823                    AwsServiceError::aws_error(
824                        StatusCode::NOT_FOUND,
825                        "CacheClusterNotFound",
826                        format!("CacheCluster {cache_cluster_id} not found."),
827                    )
828                })?;
829            if let Some(ref group_id) = cluster.replication_group_id {
830                remove_cluster_from_replication_group(
831                    &mut state,
832                    group_id,
833                    &cluster.cache_cluster_id,
834                );
835            }
836            state.tags.remove(&cluster.arn);
837            cluster
838        };
839
840        if let Some(ref runtime) = self.runtime {
841            runtime.stop_container(&cache_cluster_id).await;
842        }
843
844        let mut deleted_cluster = cluster;
845        deleted_cluster.cache_cluster_status = "deleting".to_string();
846        let xml = cache_cluster_xml(&deleted_cluster, true);
847
848        Ok(AwsResponse::xml(
849            StatusCode::OK,
850            xml_wrap(
851                "DeleteCacheCluster",
852                &format!("<CacheCluster>{xml}</CacheCluster>"),
853                &request.request_id,
854            ),
855        ))
856    }
857
858    async fn create_replication_group(
859        &self,
860        request: &AwsRequest,
861    ) -> Result<AwsResponse, AwsServiceError> {
862        let replication_group_id = required_param(request, "ReplicationGroupId")?;
863        let description = required_param(request, "ReplicationGroupDescription")?;
864        let engine = optional_param(request, "Engine").unwrap_or_else(|| ENGINE_REDIS.to_string());
865        validate_engine(&engine)?;
866        let default_version = if engine == ENGINE_VALKEY {
867            "8.0"
868        } else {
869            "7.1"
870        };
871        let engine_version =
872            optional_param(request, "EngineVersion").unwrap_or_else(|| default_version.to_string());
873        let cache_node_type = optional_param(request, "CacheNodeType")
874            .unwrap_or_else(|| "cache.t3.micro".to_string());
875        let num_cache_clusters = match optional_param(request, "NumCacheClusters") {
876            Some(v) => {
877                let n = v.parse::<i32>().map_err(|_| {
878                    AwsServiceError::aws_error(
879                        StatusCode::BAD_REQUEST,
880                        "InvalidParameterValue",
881                        format!("Invalid value for NumCacheClusters: '{v}'"),
882                    )
883                })?;
884                if n < 1 {
885                    return Err(AwsServiceError::aws_error(
886                        StatusCode::BAD_REQUEST,
887                        "InvalidParameterValue",
888                        format!("NumCacheClusters must be a positive integer, got {n}"),
889                    ));
890                }
891                n
892            }
893            None => 1,
894        };
895        let automatic_failover =
896            parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?
897                .unwrap_or(false);
898        // Reserve the ID under a write lock before starting the container.
899        {
900            let mut state = self.state.write();
901            if !state.begin_replication_group_creation(&replication_group_id) {
902                return Err(AwsServiceError::aws_error(
903                    StatusCode::BAD_REQUEST,
904                    "ReplicationGroupAlreadyExistsFault",
905                    format!("ReplicationGroup {replication_group_id} already exists."),
906                ));
907            }
908        }
909
910        let runtime = self.runtime.as_ref().ok_or_else(|| {
911            self.state
912                .write()
913                .cancel_replication_group_creation(&replication_group_id);
914            AwsServiceError::aws_error(
915                StatusCode::SERVICE_UNAVAILABLE,
916                "InvalidParameterValue",
917                "Docker/Podman is required for ElastiCache replication groups but is not available"
918                    .to_string(),
919            )
920        })?;
921
922        let running = match runtime.ensure_redis(&replication_group_id).await {
923            Ok(r) => r,
924            Err(e) => {
925                self.state
926                    .write()
927                    .cancel_replication_group_creation(&replication_group_id);
928                return Err(runtime_error_to_service_error(e));
929            }
930        };
931
932        let member_clusters: Vec<String> = (1..=num_cache_clusters)
933            .map(|i| format!("{replication_group_id}-{i:03}"))
934            .collect();
935
936        let (arn, region) = {
937            let state = self.state.read();
938            let arn = format!(
939                "arn:aws:elasticache:{}:{}:replicationgroup:{}",
940                state.region, state.account_id, replication_group_id
941            );
942            (arn, state.region.clone())
943        };
944
945        let group = ReplicationGroup {
946            replication_group_id: replication_group_id.clone(),
947            description,
948            global_replication_group_id: None,
949            global_replication_group_role: None,
950            status: "available".to_string(),
951            cache_node_type,
952            engine,
953            engine_version,
954            num_cache_clusters,
955            automatic_failover_enabled: automatic_failover,
956            endpoint_address: "127.0.0.1".to_string(),
957            endpoint_port: running.host_port,
958            arn,
959            created_at: chrono::Utc::now().to_rfc3339(),
960            container_id: running.container_id,
961            host_port: running.host_port,
962            member_clusters,
963            snapshot_retention_limit: 0,
964            snapshot_window: "05:00-09:00".to_string(),
965        };
966
967        let xml = replication_group_xml(&group, &region);
968        self.state.write().finish_replication_group_creation(group);
969
970        Ok(AwsResponse::xml(
971            StatusCode::OK,
972            xml_wrap(
973                "CreateReplicationGroup",
974                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
975                &request.request_id,
976            ),
977        ))
978    }
979
980    fn create_global_replication_group(
981        &self,
982        request: &AwsRequest,
983    ) -> Result<AwsResponse, AwsServiceError> {
984        let suffix = required_param(request, "GlobalReplicationGroupIdSuffix")?;
985        let primary_replication_group_id = required_param(request, "PrimaryReplicationGroupId")?;
986        let description =
987            optional_param(request, "GlobalReplicationGroupDescription").unwrap_or_default();
988
989        let mut state = self.state.write();
990        let region = state.region.clone();
991        let account_id = state.account_id.clone();
992        let global_replication_group_id = global_replication_group_id(&region, suffix.as_str());
993        if state
994            .global_replication_groups
995            .contains_key(&global_replication_group_id)
996        {
997            return Err(AwsServiceError::aws_error(
998                StatusCode::BAD_REQUEST,
999                "GlobalReplicationGroupAlreadyExistsFault",
1000                format!("GlobalReplicationGroup {global_replication_group_id} already exists."),
1001            ));
1002        }
1003
1004        let primary_group = state
1005            .replication_groups
1006            .get_mut(&primary_replication_group_id)
1007            .ok_or_else(|| {
1008                AwsServiceError::aws_error(
1009                    StatusCode::NOT_FOUND,
1010                    "ReplicationGroupNotFoundFault",
1011                    format!("ReplicationGroup {primary_replication_group_id} not found."),
1012                )
1013            })?;
1014
1015        if primary_group.global_replication_group_id.is_some() {
1016            return Err(AwsServiceError::aws_error(
1017                StatusCode::BAD_REQUEST,
1018                "InvalidReplicationGroupStateFault",
1019                format!(
1020                    "ReplicationGroup {primary_replication_group_id} is already associated with a GlobalReplicationGroup."
1021                ),
1022            ));
1023        }
1024
1025        primary_group.global_replication_group_id = Some(global_replication_group_id.clone());
1026        primary_group.global_replication_group_role = Some("primary".to_string());
1027
1028        let group = GlobalReplicationGroup {
1029            global_replication_group_id: global_replication_group_id.clone(),
1030            global_replication_group_description: description,
1031            status: "available".to_string(),
1032            cache_node_type: primary_group.cache_node_type.clone(),
1033            engine: primary_group.engine.clone(),
1034            engine_version: primary_group.engine_version.clone(),
1035            members: vec![GlobalReplicationGroupMember {
1036                replication_group_id: primary_group.replication_group_id.clone(),
1037                replication_group_region: region.clone(),
1038                role: "primary".to_string(),
1039                automatic_failover: primary_group.automatic_failover_enabled,
1040                status: "associated".to_string(),
1041            }],
1042            cluster_enabled: false,
1043            arn: format!(
1044                "arn:aws:elasticache:{}:{}:globalreplicationgroup:{}",
1045                region, account_id, global_replication_group_id
1046            ),
1047        };
1048
1049        let xml = global_replication_group_xml(&group, true);
1050        state
1051            .global_replication_groups
1052            .insert(global_replication_group_id, group);
1053
1054        Ok(AwsResponse::xml(
1055            StatusCode::OK,
1056            xml_wrap(
1057                "CreateGlobalReplicationGroup",
1058                &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
1059                &request.request_id,
1060            ),
1061        ))
1062    }
1063
1064    fn describe_global_replication_groups(
1065        &self,
1066        request: &AwsRequest,
1067    ) -> Result<AwsResponse, AwsServiceError> {
1068        let global_replication_group_id = optional_param(request, "GlobalReplicationGroupId");
1069        let max_records = optional_usize_param(request, "MaxRecords")?;
1070        let marker = optional_param(request, "Marker");
1071        let show_member_info =
1072            parse_optional_bool(optional_param(request, "ShowMemberInfo").as_deref())?
1073                .unwrap_or(false);
1074
1075        let state = self.state.read();
1076        let groups: Vec<&GlobalReplicationGroup> = if let Some(ref global_replication_group_id) =
1077            global_replication_group_id
1078        {
1079            match state
1080                .global_replication_groups
1081                .get(global_replication_group_id)
1082            {
1083                Some(group) => vec![group],
1084                None => {
1085                    return Err(AwsServiceError::aws_error(
1086                        StatusCode::NOT_FOUND,
1087                        "GlobalReplicationGroupNotFoundFault",
1088                        format!("GlobalReplicationGroup {global_replication_group_id} not found."),
1089                    ));
1090                }
1091            }
1092        } else {
1093            let mut groups: Vec<&GlobalReplicationGroup> =
1094                state.global_replication_groups.values().collect();
1095            groups.sort_by(|a, b| {
1096                a.global_replication_group_id
1097                    .cmp(&b.global_replication_group_id)
1098            });
1099            groups
1100        };
1101
1102        let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
1103        let groups_xml: String = page
1104            .iter()
1105            .map(|group| {
1106                format!(
1107                    "<GlobalReplicationGroup>{}</GlobalReplicationGroup>",
1108                    global_replication_group_xml(group, show_member_info)
1109                )
1110            })
1111            .collect();
1112        let marker_xml = next_marker
1113            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1114            .unwrap_or_default();
1115
1116        Ok(AwsResponse::xml(
1117            StatusCode::OK,
1118            xml_wrap(
1119                "DescribeGlobalReplicationGroups",
1120                &format!(
1121                    "<GlobalReplicationGroups>{groups_xml}</GlobalReplicationGroups>{marker_xml}"
1122                ),
1123                &request.request_id,
1124            ),
1125        ))
1126    }
1127
1128    fn describe_replication_groups(
1129        &self,
1130        request: &AwsRequest,
1131    ) -> Result<AwsResponse, AwsServiceError> {
1132        let group_id = optional_param(request, "ReplicationGroupId");
1133        let max_records = optional_usize_param(request, "MaxRecords")?;
1134        let marker = optional_param(request, "Marker");
1135
1136        let state = self.state.read();
1137        let region = state.region.clone();
1138
1139        let groups: Vec<&ReplicationGroup> = if let Some(ref id) = group_id {
1140            match state.replication_groups.get(id) {
1141                Some(g) => vec![g],
1142                None => {
1143                    return Err(AwsServiceError::aws_error(
1144                        StatusCode::NOT_FOUND,
1145                        "ReplicationGroupNotFoundFault",
1146                        format!("ReplicationGroup {id} not found."),
1147                    ));
1148                }
1149            }
1150        } else {
1151            let mut groups: Vec<&ReplicationGroup> = state.replication_groups.values().collect();
1152            groups.sort_by(|a, b| a.replication_group_id.cmp(&b.replication_group_id));
1153            groups
1154        };
1155
1156        let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
1157
1158        let members_xml: String = page
1159            .iter()
1160            .map(|g| {
1161                format!(
1162                    "<ReplicationGroup>{}</ReplicationGroup>",
1163                    replication_group_xml(g, &region)
1164                )
1165            })
1166            .collect();
1167        let marker_xml = next_marker
1168            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1169            .unwrap_or_default();
1170
1171        Ok(AwsResponse::xml(
1172            StatusCode::OK,
1173            xml_wrap(
1174                "DescribeReplicationGroups",
1175                &format!("<ReplicationGroups>{members_xml}</ReplicationGroups>{marker_xml}"),
1176                &request.request_id,
1177            ),
1178        ))
1179    }
1180
1181    fn delete_global_replication_group(
1182        &self,
1183        request: &AwsRequest,
1184    ) -> Result<AwsResponse, AwsServiceError> {
1185        let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
1186        let retain_primary = parse_required_bool(request, "RetainPrimaryReplicationGroup")?;
1187
1188        let mut state = self.state.write();
1189        let mut group = state
1190            .global_replication_groups
1191            .remove(&global_replication_group_id)
1192            .ok_or_else(|| {
1193                AwsServiceError::aws_error(
1194                    StatusCode::NOT_FOUND,
1195                    "GlobalReplicationGroupNotFoundFault",
1196                    format!("GlobalReplicationGroup {global_replication_group_id} not found."),
1197                )
1198            })?;
1199
1200        for member in &group.members {
1201            if !retain_primary && member.role == "primary" {
1202                // Delete the primary replication group when RetainPrimaryReplicationGroup=false
1203                if let Some(rg) = state
1204                    .replication_groups
1205                    .remove(&member.replication_group_id)
1206                {
1207                    state.tags.remove(&rg.arn);
1208                }
1209            } else if let Some(replication_group) = state
1210                .replication_groups
1211                .get_mut(&member.replication_group_id)
1212            {
1213                replication_group.global_replication_group_id = None;
1214                replication_group.global_replication_group_role = None;
1215            }
1216        }
1217
1218        group.status = "deleting".to_string();
1219        let xml = global_replication_group_xml(&group, true);
1220
1221        Ok(AwsResponse::xml(
1222            StatusCode::OK,
1223            xml_wrap(
1224                "DeleteGlobalReplicationGroup",
1225                &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
1226                &request.request_id,
1227            ),
1228        ))
1229    }
1230
1231    async fn delete_replication_group(
1232        &self,
1233        request: &AwsRequest,
1234    ) -> Result<AwsResponse, AwsServiceError> {
1235        let replication_group_id = required_param(request, "ReplicationGroupId")?;
1236
1237        let group = {
1238            let mut state = self.state.write();
1239            let g = state
1240                .replication_groups
1241                .remove(&replication_group_id)
1242                .ok_or_else(|| {
1243                    AwsServiceError::aws_error(
1244                        StatusCode::NOT_FOUND,
1245                        "ReplicationGroupNotFoundFault",
1246                        format!("ReplicationGroup {replication_group_id} not found."),
1247                    )
1248                })?;
1249            state.tags.remove(&g.arn);
1250            g
1251        };
1252
1253        if let Some(ref runtime) = self.runtime {
1254            runtime.stop_container(&replication_group_id).await;
1255        }
1256
1257        let region = self.state.read().region.clone();
1258        let mut deleted_group = group;
1259        deleted_group.status = "deleting".to_string();
1260        let xml = replication_group_xml(&deleted_group, &region);
1261
1262        Ok(AwsResponse::xml(
1263            StatusCode::OK,
1264            xml_wrap(
1265                "DeleteReplicationGroup",
1266                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
1267                &request.request_id,
1268            ),
1269        ))
1270    }
1271
1272    async fn create_serverless_cache(
1273        &self,
1274        request: &AwsRequest,
1275    ) -> Result<AwsResponse, AwsServiceError> {
1276        let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1277        let engine = required_param(request, "Engine")?;
1278        validate_serverless_engine(&engine)?;
1279
1280        let description = optional_param(request, "Description").unwrap_or_default();
1281        let major_engine_version = optional_param(request, "MajorEngineVersion")
1282            .unwrap_or_else(|| default_major_engine_version(&engine).to_string());
1283        let full_engine_version = default_full_engine_version(&engine, &major_engine_version)?;
1284        let cache_usage_limits = parse_cache_usage_limits(request)?;
1285        let security_group_ids =
1286            parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
1287        let subnet_ids = parse_query_list_param(request, "SubnetIds", "SubnetId");
1288        let kms_key_id = optional_param(request, "KmsKeyId");
1289        let user_group_id = optional_param(request, "UserGroupId");
1290        let snapshot_retention_limit =
1291            optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
1292        let daily_snapshot_time = optional_param(request, "DailySnapshotTime");
1293        let tags = parse_tags(request)?;
1294
1295        let (arn, endpoint_address) = {
1296            let mut state = self.state.write();
1297            if !state.begin_serverless_cache_creation(&serverless_cache_name) {
1298                return Err(AwsServiceError::aws_error(
1299                    StatusCode::BAD_REQUEST,
1300                    "ServerlessCacheAlreadyExistsFault",
1301                    format!("ServerlessCache {serverless_cache_name} already exists."),
1302                ));
1303            }
1304
1305            if let Some(ref group_id) = user_group_id {
1306                let user_group_status = match state.user_groups.get(group_id) {
1307                    Some(user_group) => user_group.status.clone(),
1308                    None => {
1309                        state.cancel_serverless_cache_creation(&serverless_cache_name);
1310                        return Err(AwsServiceError::aws_error(
1311                            StatusCode::NOT_FOUND,
1312                            "UserGroupNotFound",
1313                            format!("User group {group_id} not found."),
1314                        ));
1315                    }
1316                };
1317                if user_group_status != "active" {
1318                    state.cancel_serverless_cache_creation(&serverless_cache_name);
1319                    return Err(AwsServiceError::aws_error(
1320                        StatusCode::BAD_REQUEST,
1321                        "InvalidUserGroupState",
1322                        format!("User group {group_id} is not in active state."),
1323                    ));
1324                }
1325            }
1326
1327            let arn = format!(
1328                "arn:aws:elasticache:{}:{}:serverlesscache:{}",
1329                state.region, state.account_id, serverless_cache_name
1330            );
1331            (arn, "127.0.0.1".to_string())
1332        };
1333
1334        let runtime = self.runtime.as_ref().ok_or_else(|| {
1335            self.state
1336                .write()
1337                .cancel_serverless_cache_creation(&serverless_cache_name);
1338            AwsServiceError::aws_error(
1339                StatusCode::SERVICE_UNAVAILABLE,
1340                "InvalidParameterValue",
1341                "Docker/Podman is required for ElastiCache serverless caches but is not available"
1342                    .to_string(),
1343            )
1344        })?;
1345
1346        let running = match runtime.ensure_redis(&serverless_cache_name).await {
1347            Ok(r) => r,
1348            Err(e) => {
1349                self.state
1350                    .write()
1351                    .cancel_serverless_cache_creation(&serverless_cache_name);
1352                return Err(runtime_error_to_service_error(e));
1353            }
1354        };
1355
1356        let endpoint = ServerlessCacheEndpoint {
1357            address: endpoint_address.clone(),
1358            port: running.host_port,
1359        };
1360        let reader_endpoint = ServerlessCacheEndpoint {
1361            address: endpoint_address,
1362            port: running.host_port,
1363        };
1364        let cache = ServerlessCache {
1365            serverless_cache_name: serverless_cache_name.clone(),
1366            description,
1367            engine,
1368            major_engine_version,
1369            full_engine_version,
1370            status: "available".to_string(),
1371            endpoint,
1372            reader_endpoint,
1373            arn: arn.clone(),
1374            created_at: chrono::Utc::now().to_rfc3339(),
1375            cache_usage_limits,
1376            security_group_ids,
1377            subnet_ids,
1378            kms_key_id,
1379            user_group_id,
1380            snapshot_retention_limit,
1381            daily_snapshot_time,
1382            container_id: running.container_id,
1383            host_port: running.host_port,
1384        };
1385
1386        let xml = serverless_cache_xml(&cache);
1387        {
1388            let mut state = self.state.write();
1389            state.finish_serverless_cache_creation(cache.clone());
1390            if !tags.is_empty() {
1391                merge_tags(state.tags.entry(arn).or_default(), &tags);
1392            }
1393        }
1394
1395        Ok(AwsResponse::xml(
1396            StatusCode::OK,
1397            xml_wrap(
1398                "CreateServerlessCache",
1399                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1400                &request.request_id,
1401            ),
1402        ))
1403    }
1404
1405    fn describe_serverless_caches(
1406        &self,
1407        request: &AwsRequest,
1408    ) -> Result<AwsResponse, AwsServiceError> {
1409        let serverless_cache_name = optional_param(request, "ServerlessCacheName");
1410        let max_results = optional_usize_param(request, "MaxResults")?;
1411        let next_token = optional_param(request, "NextToken");
1412
1413        let state = self.state.read();
1414        let caches: Vec<&ServerlessCache> = if let Some(ref name) = serverless_cache_name {
1415            match state.serverless_caches.get(name) {
1416                Some(cache) => vec![cache],
1417                None => {
1418                    return Err(AwsServiceError::aws_error(
1419                        StatusCode::NOT_FOUND,
1420                        "ServerlessCacheNotFoundFault",
1421                        format!("ServerlessCache {name} not found."),
1422                    ));
1423                }
1424            }
1425        } else {
1426            let mut caches: Vec<&ServerlessCache> = state.serverless_caches.values().collect();
1427            caches.sort_by(|a, b| a.serverless_cache_name.cmp(&b.serverless_cache_name));
1428            caches
1429        };
1430
1431        let (page, next_token) = paginate(&caches, next_token.as_deref(), max_results);
1432        let members_xml: String = page
1433            .iter()
1434            .map(|cache| format!("<member>{}</member>", serverless_cache_xml(cache)))
1435            .collect();
1436        let next_token_xml = next_token
1437            .map(|token| format!("<NextToken>{}</NextToken>", xml_escape(&token)))
1438            .unwrap_or_default();
1439
1440        Ok(AwsResponse::xml(
1441            StatusCode::OK,
1442            xml_wrap(
1443                "DescribeServerlessCaches",
1444                &format!("<ServerlessCaches>{members_xml}</ServerlessCaches>{next_token_xml}"),
1445                &request.request_id,
1446            ),
1447        ))
1448    }
1449
1450    async fn delete_serverless_cache(
1451        &self,
1452        request: &AwsRequest,
1453    ) -> Result<AwsResponse, AwsServiceError> {
1454        let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1455
1456        let cache = {
1457            let mut state = self.state.write();
1458            let cache = state
1459                .serverless_caches
1460                .remove(&serverless_cache_name)
1461                .ok_or_else(|| {
1462                    AwsServiceError::aws_error(
1463                        StatusCode::NOT_FOUND,
1464                        "ServerlessCacheNotFoundFault",
1465                        format!("ServerlessCache {serverless_cache_name} not found."),
1466                    )
1467                })?;
1468            state.tags.remove(&cache.arn);
1469            cache
1470        };
1471
1472        if let Some(ref runtime) = self.runtime {
1473            runtime.stop_container(&serverless_cache_name).await;
1474        }
1475
1476        let mut deleted = cache;
1477        deleted.status = "deleting".to_string();
1478        let xml = serverless_cache_xml(&deleted);
1479
1480        Ok(AwsResponse::xml(
1481            StatusCode::OK,
1482            xml_wrap(
1483                "DeleteServerlessCache",
1484                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1485                &request.request_id,
1486            ),
1487        ))
1488    }
1489
1490    fn modify_serverless_cache(
1491        &self,
1492        request: &AwsRequest,
1493    ) -> Result<AwsResponse, AwsServiceError> {
1494        let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1495        let description = optional_param(request, "Description");
1496        let cache_usage_limits = parse_cache_usage_limits(request)?;
1497        let security_group_ids =
1498            parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
1499        let user_group_id = optional_param(request, "UserGroupId");
1500        let snapshot_retention_limit =
1501            optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
1502        let daily_snapshot_time = optional_param(request, "DailySnapshotTime");
1503
1504        let mut state = self.state.write();
1505
1506        if let Some(ref group_id) = user_group_id {
1507            let user_group = state.user_groups.get(group_id).ok_or_else(|| {
1508                AwsServiceError::aws_error(
1509                    StatusCode::NOT_FOUND,
1510                    "UserGroupNotFound",
1511                    format!("User group {group_id} not found."),
1512                )
1513            })?;
1514            if user_group.status != "active" {
1515                return Err(AwsServiceError::aws_error(
1516                    StatusCode::BAD_REQUEST,
1517                    "InvalidUserGroupState",
1518                    format!("User group {group_id} is not in active state."),
1519                ));
1520            }
1521        }
1522
1523        let cache = state
1524            .serverless_caches
1525            .get_mut(&serverless_cache_name)
1526            .ok_or_else(|| {
1527                AwsServiceError::aws_error(
1528                    StatusCode::NOT_FOUND,
1529                    "ServerlessCacheNotFoundFault",
1530                    format!("ServerlessCache {serverless_cache_name} not found."),
1531                )
1532            })?;
1533
1534        if let Some(description) = description {
1535            cache.description = description;
1536        }
1537        if cache_usage_limits.is_some() {
1538            cache.cache_usage_limits = cache_usage_limits;
1539        }
1540        if !security_group_ids.is_empty() {
1541            cache.security_group_ids = security_group_ids;
1542        }
1543        if let Some(user_group_id) = user_group_id {
1544            cache.user_group_id = Some(user_group_id);
1545        }
1546        if let Some(snapshot_retention_limit) = snapshot_retention_limit {
1547            cache.snapshot_retention_limit = Some(snapshot_retention_limit);
1548        }
1549        if let Some(daily_snapshot_time) = daily_snapshot_time {
1550            cache.daily_snapshot_time = Some(daily_snapshot_time);
1551        }
1552
1553        let xml = serverless_cache_xml(cache);
1554        Ok(AwsResponse::xml(
1555            StatusCode::OK,
1556            xml_wrap(
1557                "ModifyServerlessCache",
1558                &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1559                &request.request_id,
1560            ),
1561        ))
1562    }
1563
1564    fn create_serverless_cache_snapshot(
1565        &self,
1566        request: &AwsRequest,
1567    ) -> Result<AwsResponse, AwsServiceError> {
1568        let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1569        let serverless_cache_snapshot_name =
1570            required_param(request, "ServerlessCacheSnapshotName")?;
1571        let kms_key_id = optional_param(request, "KmsKeyId");
1572        let tags = parse_tags(request)?;
1573
1574        let mut state = self.state.write();
1575        if state
1576            .serverless_cache_snapshots
1577            .contains_key(&serverless_cache_snapshot_name)
1578        {
1579            return Err(AwsServiceError::aws_error(
1580                StatusCode::BAD_REQUEST,
1581                "ServerlessCacheSnapshotAlreadyExistsFault",
1582                format!("ServerlessCacheSnapshot {serverless_cache_snapshot_name} already exists."),
1583            ));
1584        }
1585
1586        let cache = state
1587            .serverless_caches
1588            .get(&serverless_cache_name)
1589            .ok_or_else(|| {
1590                AwsServiceError::aws_error(
1591                    StatusCode::NOT_FOUND,
1592                    "ServerlessCacheNotFoundFault",
1593                    format!("ServerlessCache {serverless_cache_name} not found."),
1594                )
1595            })?;
1596
1597        let arn = format!(
1598            "arn:aws:elasticache:{}:{}:serverlesssnapshot:{}",
1599            state.region, state.account_id, serverless_cache_snapshot_name
1600        );
1601        let snapshot = ServerlessCacheSnapshot {
1602            serverless_cache_snapshot_name: serverless_cache_snapshot_name.clone(),
1603            arn: arn.clone(),
1604            kms_key_id: kms_key_id.or_else(|| cache.kms_key_id.clone()),
1605            snapshot_type: "manual".to_string(),
1606            status: "available".to_string(),
1607            create_time: chrono::Utc::now().to_rfc3339(),
1608            expiry_time: None,
1609            bytes_used_for_cache: None,
1610            serverless_cache_name: cache.serverless_cache_name.clone(),
1611            engine: cache.engine.clone(),
1612            major_engine_version: cache.major_engine_version.clone(),
1613        };
1614
1615        let xml = serverless_cache_snapshot_xml(&snapshot);
1616        state.tags.insert(arn.clone(), Vec::new());
1617        if !tags.is_empty() {
1618            merge_tags(state.tags.entry(arn).or_default(), &tags);
1619        }
1620        state
1621            .serverless_cache_snapshots
1622            .insert(serverless_cache_snapshot_name, snapshot);
1623
1624        Ok(AwsResponse::xml(
1625            StatusCode::OK,
1626            xml_wrap(
1627                "CreateServerlessCacheSnapshot",
1628                &format!("<ServerlessCacheSnapshot>{xml}</ServerlessCacheSnapshot>"),
1629                &request.request_id,
1630            ),
1631        ))
1632    }
1633
1634    fn describe_serverless_cache_snapshots(
1635        &self,
1636        request: &AwsRequest,
1637    ) -> Result<AwsResponse, AwsServiceError> {
1638        let serverless_cache_name = optional_param(request, "ServerlessCacheName");
1639        let serverless_cache_snapshot_name = optional_param(request, "ServerlessCacheSnapshotName");
1640        let snapshot_type = optional_param(request, "SnapshotType");
1641        let max_results = optional_usize_param(request, "MaxResults")?;
1642        let next_token = optional_param(request, "NextToken");
1643
1644        let state = self.state.read();
1645        let snapshots: Vec<&ServerlessCacheSnapshot> =
1646            if let Some(ref snapshot_name) = serverless_cache_snapshot_name {
1647                match state.serverless_cache_snapshots.get(snapshot_name) {
1648                    Some(snapshot) => vec![snapshot],
1649                    None => {
1650                        return Err(AwsServiceError::aws_error(
1651                            StatusCode::NOT_FOUND,
1652                            "ServerlessCacheSnapshotNotFoundFault",
1653                            format!("ServerlessCacheSnapshot {snapshot_name} not found."),
1654                        ));
1655                    }
1656                }
1657            } else {
1658                if let Some(ref cache_name) = serverless_cache_name {
1659                    if !state.serverless_caches.contains_key(cache_name) {
1660                        return Err(AwsServiceError::aws_error(
1661                            StatusCode::NOT_FOUND,
1662                            "ServerlessCacheNotFoundFault",
1663                            format!("ServerlessCache {cache_name} not found."),
1664                        ));
1665                    }
1666                }
1667
1668                let mut snapshots: Vec<&ServerlessCacheSnapshot> = state
1669                    .serverless_cache_snapshots
1670                    .values()
1671                    .filter(|snapshot| {
1672                        serverless_cache_name
1673                            .as_ref()
1674                            .is_none_or(|name| snapshot.serverless_cache_name == *name)
1675                    })
1676                    .filter(|snapshot| {
1677                        snapshot_type
1678                            .as_ref()
1679                            .is_none_or(|value| snapshot.snapshot_type == *value)
1680                    })
1681                    .collect();
1682                snapshots.sort_by(|a, b| {
1683                    a.serverless_cache_snapshot_name
1684                        .cmp(&b.serverless_cache_snapshot_name)
1685                });
1686                snapshots
1687            };
1688
1689        let (page, next_token) = paginate(&snapshots, next_token.as_deref(), max_results);
1690        let members_xml: String = page
1691            .iter()
1692            .map(|snapshot| {
1693                format!(
1694                    "<ServerlessCacheSnapshot>{}</ServerlessCacheSnapshot>",
1695                    serverless_cache_snapshot_xml(snapshot)
1696                )
1697            })
1698            .collect();
1699        let next_token_xml = next_token
1700            .map(|token| format!("<NextToken>{}</NextToken>", xml_escape(&token)))
1701            .unwrap_or_default();
1702
1703        Ok(AwsResponse::xml(
1704            StatusCode::OK,
1705            xml_wrap(
1706                "DescribeServerlessCacheSnapshots",
1707                &format!(
1708                    "<ServerlessCacheSnapshots>{members_xml}</ServerlessCacheSnapshots>{next_token_xml}"
1709                ),
1710                &request.request_id,
1711            ),
1712        ))
1713    }
1714
1715    fn delete_serverless_cache_snapshot(
1716        &self,
1717        request: &AwsRequest,
1718    ) -> Result<AwsResponse, AwsServiceError> {
1719        let serverless_cache_snapshot_name =
1720            required_param(request, "ServerlessCacheSnapshotName")?;
1721
1722        let mut state = self.state.write();
1723        let mut snapshot = state
1724            .serverless_cache_snapshots
1725            .remove(&serverless_cache_snapshot_name)
1726            .ok_or_else(|| {
1727                AwsServiceError::aws_error(
1728                    StatusCode::NOT_FOUND,
1729                    "ServerlessCacheSnapshotNotFoundFault",
1730                    format!("ServerlessCacheSnapshot {serverless_cache_snapshot_name} not found."),
1731                )
1732            })?;
1733        state.tags.remove(&snapshot.arn);
1734
1735        snapshot.status = "deleting".to_string();
1736        let xml = serverless_cache_snapshot_xml(&snapshot);
1737
1738        Ok(AwsResponse::xml(
1739            StatusCode::OK,
1740            xml_wrap(
1741                "DeleteServerlessCacheSnapshot",
1742                &format!("<ServerlessCacheSnapshot>{xml}</ServerlessCacheSnapshot>"),
1743                &request.request_id,
1744            ),
1745        ))
1746    }
1747
1748    fn create_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749        let snapshot_name = required_param(request, "SnapshotName")?;
1750        let replication_group_id = optional_param(request, "ReplicationGroupId");
1751        let cache_cluster_id = optional_param(request, "CacheClusterId");
1752
1753        if replication_group_id.is_none() && cache_cluster_id.is_none() {
1754            return Err(AwsServiceError::aws_error(
1755                StatusCode::BAD_REQUEST,
1756                "InvalidParameterCombination",
1757                "At least one of ReplicationGroupId or CacheClusterId must be specified."
1758                    .to_string(),
1759            ));
1760        }
1761
1762        let mut state = self.state.write();
1763
1764        if state.snapshots.contains_key(&snapshot_name) {
1765            return Err(AwsServiceError::aws_error(
1766                StatusCode::BAD_REQUEST,
1767                "SnapshotAlreadyExistsFault",
1768                format!("Snapshot {snapshot_name} already exists."),
1769            ));
1770        }
1771
1772        // Resolve the replication group: either directly by ID or via CacheClusterId
1773        let group_id = if let Some(ref rg_id) = replication_group_id {
1774            rg_id.clone()
1775        } else {
1776            let cluster_id = cache_cluster_id.as_ref().unwrap();
1777            if let Some(cluster) = state.cache_clusters.get(cluster_id) {
1778                if let Some(group_id) = cluster.replication_group_id.clone() {
1779                    group_id
1780                } else {
1781                    return Err(AwsServiceError::aws_error(
1782                        StatusCode::BAD_REQUEST,
1783                        "InvalidParameterCombination",
1784                        format!(
1785                            "CacheCluster {cluster_id} is not associated with a replication group."
1786                        ),
1787                    ));
1788                }
1789            } else {
1790                // CacheClusterId may also map to a member cluster like "rg-001", find parent group
1791                state
1792                    .replication_groups
1793                    .values()
1794                    .find(|g| g.member_clusters.contains(cluster_id))
1795                    .map(|g| g.replication_group_id.clone())
1796                    .ok_or_else(|| {
1797                        AwsServiceError::aws_error(
1798                            StatusCode::NOT_FOUND,
1799                            "CacheClusterNotFound",
1800                            format!("CacheCluster {cluster_id} not found."),
1801                        )
1802                    })?
1803            }
1804        };
1805
1806        let group = state.replication_groups.get(&group_id).ok_or_else(|| {
1807            AwsServiceError::aws_error(
1808                StatusCode::NOT_FOUND,
1809                "ReplicationGroupNotFoundFault",
1810                format!("ReplicationGroup {group_id} not found."),
1811            )
1812        })?;
1813
1814        let arn = format!(
1815            "arn:aws:elasticache:{}:{}:snapshot:{}",
1816            state.region, state.account_id, snapshot_name
1817        );
1818
1819        let snapshot = CacheSnapshot {
1820            snapshot_name: snapshot_name.clone(),
1821            replication_group_id: group.replication_group_id.clone(),
1822            replication_group_description: group.description.clone(),
1823            snapshot_status: "available".to_string(),
1824            cache_node_type: group.cache_node_type.clone(),
1825            engine: group.engine.clone(),
1826            engine_version: group.engine_version.clone(),
1827            num_cache_clusters: group.num_cache_clusters,
1828            arn: arn.clone(),
1829            created_at: chrono::Utc::now().to_rfc3339(),
1830            snapshot_source: "manual".to_string(),
1831        };
1832
1833        let xml = snapshot_xml(&snapshot);
1834        state.tags.insert(arn, Vec::new());
1835        state.snapshots.insert(snapshot_name, snapshot);
1836
1837        Ok(AwsResponse::xml(
1838            StatusCode::OK,
1839            xml_wrap(
1840                "CreateSnapshot",
1841                &format!("<Snapshot>{xml}</Snapshot>"),
1842                &request.request_id,
1843            ),
1844        ))
1845    }
1846
1847    fn describe_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1848        let snapshot_name = optional_param(request, "SnapshotName");
1849        let replication_group_id = optional_param(request, "ReplicationGroupId");
1850        let cache_cluster_id = optional_param(request, "CacheClusterId");
1851        let max_records = optional_usize_param(request, "MaxRecords")?;
1852        let marker = optional_param(request, "Marker");
1853
1854        let state = self.state.read();
1855
1856        let snapshots: Vec<&CacheSnapshot> = if let Some(ref name) = snapshot_name {
1857            match state.snapshots.get(name) {
1858                Some(s) => vec![s],
1859                None => {
1860                    return Err(AwsServiceError::aws_error(
1861                        StatusCode::NOT_FOUND,
1862                        "SnapshotNotFoundFault",
1863                        format!("Snapshot {name} not found."),
1864                    ));
1865                }
1866            }
1867        } else {
1868            let mut snaps: Vec<&CacheSnapshot> = state
1869                .snapshots
1870                .values()
1871                .filter(|s| {
1872                    replication_group_id
1873                        .as_ref()
1874                        .is_none_or(|id| s.replication_group_id == *id)
1875                })
1876                .filter(|s| {
1877                    cache_cluster_id.as_ref().is_none_or(|cluster_id| {
1878                        state.cache_clusters.get(cluster_id).is_some_and(|cluster| {
1879                            cluster.replication_group_id.as_deref() == Some(&s.replication_group_id)
1880                        }) || state
1881                            .replication_groups
1882                            .get(&s.replication_group_id)
1883                            .is_some_and(|g| g.member_clusters.contains(cluster_id))
1884                    })
1885                })
1886                .collect();
1887            snaps.sort_by(|a, b| a.snapshot_name.cmp(&b.snapshot_name));
1888            snaps
1889        };
1890
1891        let (page, next_marker) = paginate(&snapshots, marker.as_deref(), max_records);
1892
1893        let members_xml: String = page
1894            .iter()
1895            .map(|s| format!("<Snapshot>{}</Snapshot>", snapshot_xml(s)))
1896            .collect();
1897        let marker_xml = next_marker
1898            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1899            .unwrap_or_default();
1900
1901        Ok(AwsResponse::xml(
1902            StatusCode::OK,
1903            xml_wrap(
1904                "DescribeSnapshots",
1905                &format!("<Snapshots>{members_xml}</Snapshots>{marker_xml}"),
1906                &request.request_id,
1907            ),
1908        ))
1909    }
1910
1911    fn delete_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1912        let snapshot_name = required_param(request, "SnapshotName")?;
1913
1914        let mut state = self.state.write();
1915        let mut snapshot = state.snapshots.remove(&snapshot_name).ok_or_else(|| {
1916            AwsServiceError::aws_error(
1917                StatusCode::NOT_FOUND,
1918                "SnapshotNotFoundFault",
1919                format!("Snapshot {snapshot_name} not found."),
1920            )
1921        })?;
1922        state.tags.remove(&snapshot.arn);
1923
1924        snapshot.snapshot_status = "deleting".to_string();
1925        let xml = snapshot_xml(&snapshot);
1926
1927        Ok(AwsResponse::xml(
1928            StatusCode::OK,
1929            xml_wrap(
1930                "DeleteSnapshot",
1931                &format!("<Snapshot>{xml}</Snapshot>"),
1932                &request.request_id,
1933            ),
1934        ))
1935    }
1936
1937    fn modify_replication_group(
1938        &self,
1939        request: &AwsRequest,
1940    ) -> Result<AwsResponse, AwsServiceError> {
1941        let replication_group_id = required_param(request, "ReplicationGroupId")?;
1942
1943        let new_description = optional_param(request, "ReplicationGroupDescription");
1944        let new_cache_node_type = optional_param(request, "CacheNodeType");
1945        let new_engine_version = optional_param(request, "EngineVersion");
1946        let new_automatic_failover =
1947            parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?;
1948        let new_snapshot_retention_limit = optional_param(request, "SnapshotRetentionLimit")
1949            .map(|v| {
1950                let val = v.parse::<i32>().map_err(|_| {
1951                    AwsServiceError::aws_error(
1952                        StatusCode::BAD_REQUEST,
1953                        "InvalidParameterValue",
1954                        format!("Invalid value for SnapshotRetentionLimit: '{v}'"),
1955                    )
1956                })?;
1957                if val < 0 {
1958                    return Err(AwsServiceError::aws_error(
1959                        StatusCode::BAD_REQUEST,
1960                        "InvalidParameterValue",
1961                        format!("SnapshotRetentionLimit must be non-negative, got {val}"),
1962                    ));
1963                }
1964                Ok(val)
1965            })
1966            .transpose()?;
1967        let new_snapshot_window = optional_param(request, "SnapshotWindow");
1968        let user_group_ids_to_add =
1969            parse_member_list(&request.query_params, "UserGroupIdsToAdd", "member");
1970        let user_group_ids_to_remove =
1971            parse_member_list(&request.query_params, "UserGroupIdsToRemove", "member");
1972
1973        let mut state = self.state.write();
1974
1975        let group = state
1976            .replication_groups
1977            .get_mut(&replication_group_id)
1978            .ok_or_else(|| {
1979                AwsServiceError::aws_error(
1980                    StatusCode::NOT_FOUND,
1981                    "ReplicationGroupNotFoundFault",
1982                    format!("ReplicationGroup {replication_group_id} not found."),
1983                )
1984            })?;
1985
1986        if let Some(desc) = new_description {
1987            group.description = desc;
1988        }
1989        if let Some(node_type) = new_cache_node_type {
1990            group.cache_node_type = node_type;
1991        }
1992        if let Some(version) = new_engine_version {
1993            group.engine_version = version;
1994        }
1995        if let Some(af) = new_automatic_failover {
1996            group.automatic_failover_enabled = af;
1997        }
1998        if let Some(limit) = new_snapshot_retention_limit {
1999            group.snapshot_retention_limit = limit;
2000        }
2001        if let Some(window) = new_snapshot_window {
2002            group.snapshot_window = window;
2003        }
2004
2005        // Associate/disassociate user groups
2006        for ug_id in &user_group_ids_to_add {
2007            if let Some(ug) = state.user_groups.get_mut(ug_id) {
2008                if !ug.replication_groups.contains(&replication_group_id) {
2009                    ug.replication_groups.push(replication_group_id.clone());
2010                }
2011            }
2012        }
2013        for ug_id in &user_group_ids_to_remove {
2014            if let Some(ug) = state.user_groups.get_mut(ug_id) {
2015                ug.replication_groups
2016                    .retain(|id| id != &replication_group_id);
2017            }
2018        }
2019
2020        let group = state.replication_groups[&replication_group_id].clone();
2021        let region = state.region.clone();
2022        let xml = replication_group_xml(&group, &region);
2023
2024        Ok(AwsResponse::xml(
2025            StatusCode::OK,
2026            xml_wrap(
2027                "ModifyReplicationGroup",
2028                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2029                &request.request_id,
2030            ),
2031        ))
2032    }
2033
2034    fn modify_global_replication_group(
2035        &self,
2036        request: &AwsRequest,
2037    ) -> Result<AwsResponse, AwsServiceError> {
2038        let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2039        let _apply_immediately = parse_required_bool(request, "ApplyImmediately")?;
2040        let new_description = optional_param(request, "GlobalReplicationGroupDescription");
2041        let new_cache_node_type = optional_param(request, "CacheNodeType");
2042        let new_engine = optional_param(request, "Engine");
2043        let new_engine_version = optional_param(request, "EngineVersion");
2044        let new_automatic_failover =
2045            parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?;
2046
2047        let mut state = self.state.write();
2048        let primary_replication_group_id = state
2049            .global_replication_groups
2050            .get(&global_replication_group_id)
2051            .and_then(primary_global_member)
2052            .map(|member| member.replication_group_id.clone())
2053            .ok_or_else(|| {
2054                AwsServiceError::aws_error(
2055                    StatusCode::NOT_FOUND,
2056                    "GlobalReplicationGroupNotFoundFault",
2057                    format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2058                )
2059            })?;
2060
2061        if let Some(ref engine) = new_engine {
2062            validate_serverless_engine(engine)?;
2063            let current_engine =
2064                &state.global_replication_groups[&global_replication_group_id].engine;
2065            if engine != current_engine {
2066                return Err(AwsServiceError::aws_error(
2067                    StatusCode::BAD_REQUEST,
2068                    "InvalidParameterValue",
2069                    format!(
2070                        "Engine changes are not supported for GlobalReplicationGroup {global_replication_group_id}."
2071                    ),
2072                ));
2073            }
2074        }
2075
2076        if let Some(primary_group) = state
2077            .replication_groups
2078            .get_mut(&primary_replication_group_id)
2079        {
2080            if let Some(cache_node_type) = new_cache_node_type.clone() {
2081                primary_group.cache_node_type = cache_node_type;
2082            }
2083            if let Some(engine_version) = new_engine_version.clone() {
2084                primary_group.engine_version = engine_version;
2085            }
2086            if let Some(automatic_failover) = new_automatic_failover {
2087                primary_group.automatic_failover_enabled = automatic_failover;
2088            }
2089        }
2090
2091        let primary_group = state.replication_groups[&primary_replication_group_id].clone();
2092        let group = state
2093            .global_replication_groups
2094            .get_mut(&global_replication_group_id)
2095            .expect("global replication group exists");
2096        if let Some(description) = new_description {
2097            group.global_replication_group_description = description;
2098        }
2099        group.cache_node_type = primary_group.cache_node_type.clone();
2100        group.engine = primary_group.engine.clone();
2101        group.engine_version = primary_group.engine_version.clone();
2102        if let Some(member) = group
2103            .members
2104            .iter_mut()
2105            .find(|member| member.role == "primary")
2106        {
2107            member.automatic_failover = primary_group.automatic_failover_enabled;
2108        }
2109
2110        let xml = global_replication_group_xml(group, true);
2111        Ok(AwsResponse::xml(
2112            StatusCode::OK,
2113            xml_wrap(
2114                "ModifyGlobalReplicationGroup",
2115                &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2116                &request.request_id,
2117            ),
2118        ))
2119    }
2120
2121    fn increase_replica_count(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2122        let replication_group_id = required_param(request, "ReplicationGroupId")?;
2123        let apply_str = required_param(request, "ApplyImmediately")?;
2124        let _apply_immediately = parse_optional_bool(Some(&apply_str))?.ok_or_else(|| {
2125            AwsServiceError::aws_error(
2126                StatusCode::BAD_REQUEST,
2127                "InvalidParameterValue",
2128                format!(
2129                    "Invalid boolean value for ApplyImmediately: '{}'",
2130                    apply_str
2131                ),
2132            )
2133        })?;
2134
2135        let new_replica_count = optional_param(request, "NewReplicaCount")
2136            .map(|v| {
2137                v.parse::<i32>().map_err(|_| {
2138                    AwsServiceError::aws_error(
2139                        StatusCode::BAD_REQUEST,
2140                        "InvalidParameterValue",
2141                        format!("Invalid value for NewReplicaCount: '{v}'"),
2142                    )
2143                })
2144            })
2145            .transpose()?
2146            .ok_or_else(|| {
2147                AwsServiceError::aws_error(
2148                    StatusCode::BAD_REQUEST,
2149                    "MissingParameter",
2150                    "The request must contain the parameter NewReplicaCount.".to_string(),
2151                )
2152            })?;
2153
2154        if new_replica_count < 1 {
2155            return Err(AwsServiceError::aws_error(
2156                StatusCode::BAD_REQUEST,
2157                "InvalidParameterValue",
2158                format!("NewReplicaCount must be a positive integer, got {new_replica_count}"),
2159            ));
2160        }
2161
2162        let mut state = self.state.write();
2163
2164        let group = state
2165            .replication_groups
2166            .get_mut(&replication_group_id)
2167            .ok_or_else(|| {
2168                AwsServiceError::aws_error(
2169                    StatusCode::NOT_FOUND,
2170                    "ReplicationGroupNotFoundFault",
2171                    format!("ReplicationGroup {replication_group_id} not found."),
2172                )
2173            })?;
2174
2175        // new_replica_count is number of replicas (excluding primary), so total clusters = replicas + 1
2176        let new_total = new_replica_count.checked_add(1).ok_or_else(|| {
2177            AwsServiceError::aws_error(
2178                StatusCode::BAD_REQUEST,
2179                "InvalidParameterValue",
2180                format!("NewReplicaCount value {new_replica_count} is too large"),
2181            )
2182        })?;
2183        if new_total <= group.num_cache_clusters {
2184            return Err(AwsServiceError::aws_error(
2185                StatusCode::BAD_REQUEST,
2186                "InvalidParameterValue",
2187                format!(
2188                    "NewReplicaCount ({new_replica_count}) must result in more clusters than current count ({}).",
2189                    group.num_cache_clusters
2190                ),
2191            ));
2192        }
2193
2194        group.num_cache_clusters = new_total;
2195        group.member_clusters = (1..=new_total)
2196            .map(|i| format!("{replication_group_id}-{i:03}"))
2197            .collect();
2198
2199        let group = group.clone();
2200        let region = state.region.clone();
2201        let xml = replication_group_xml(&group, &region);
2202
2203        Ok(AwsResponse::xml(
2204            StatusCode::OK,
2205            xml_wrap(
2206                "IncreaseReplicaCount",
2207                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2208                &request.request_id,
2209            ),
2210        ))
2211    }
2212
2213    fn decrease_replica_count(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2214        let replication_group_id = required_param(request, "ReplicationGroupId")?;
2215        let apply_str = required_param(request, "ApplyImmediately")?;
2216        let _apply_immediately = parse_optional_bool(Some(&apply_str))?.ok_or_else(|| {
2217            AwsServiceError::aws_error(
2218                StatusCode::BAD_REQUEST,
2219                "InvalidParameterValue",
2220                format!(
2221                    "Invalid boolean value for ApplyImmediately: '{}'",
2222                    apply_str
2223                ),
2224            )
2225        })?;
2226
2227        let new_replica_count = optional_param(request, "NewReplicaCount")
2228            .map(|v| {
2229                v.parse::<i32>().map_err(|_| {
2230                    AwsServiceError::aws_error(
2231                        StatusCode::BAD_REQUEST,
2232                        "InvalidParameterValue",
2233                        format!("Invalid value for NewReplicaCount: '{v}'"),
2234                    )
2235                })
2236            })
2237            .transpose()?
2238            .ok_or_else(|| {
2239                AwsServiceError::aws_error(
2240                    StatusCode::BAD_REQUEST,
2241                    "MissingParameter",
2242                    "The request must contain the parameter NewReplicaCount.".to_string(),
2243                )
2244            })?;
2245
2246        if new_replica_count < 0 {
2247            return Err(AwsServiceError::aws_error(
2248                StatusCode::BAD_REQUEST,
2249                "InvalidParameterValue",
2250                format!("NewReplicaCount must be non-negative, got {new_replica_count}"),
2251            ));
2252        }
2253
2254        let mut state = self.state.write();
2255
2256        let group = state
2257            .replication_groups
2258            .get_mut(&replication_group_id)
2259            .ok_or_else(|| {
2260                AwsServiceError::aws_error(
2261                    StatusCode::NOT_FOUND,
2262                    "ReplicationGroupNotFoundFault",
2263                    format!("ReplicationGroup {replication_group_id} not found."),
2264                )
2265            })?;
2266
2267        // new_replica_count is number of replicas (excluding primary), so total clusters = replicas + 1
2268        let new_total = new_replica_count.checked_add(1).ok_or_else(|| {
2269            AwsServiceError::aws_error(
2270                StatusCode::BAD_REQUEST,
2271                "InvalidParameterValue",
2272                format!("NewReplicaCount value {new_replica_count} is too large"),
2273            )
2274        })?;
2275        if new_total >= group.num_cache_clusters {
2276            return Err(AwsServiceError::aws_error(
2277                StatusCode::BAD_REQUEST,
2278                "InvalidParameterValue",
2279                format!(
2280                    "NewReplicaCount ({new_replica_count}) must result in fewer clusters than current count ({}).",
2281                    group.num_cache_clusters
2282                ),
2283            ));
2284        }
2285
2286        group.num_cache_clusters = new_total;
2287        group.member_clusters = (1..=new_total)
2288            .map(|i| format!("{replication_group_id}-{i:03}"))
2289            .collect();
2290
2291        let group = group.clone();
2292        let region = state.region.clone();
2293        let xml = replication_group_xml(&group, &region);
2294
2295        Ok(AwsResponse::xml(
2296            StatusCode::OK,
2297            xml_wrap(
2298                "DecreaseReplicaCount",
2299                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2300                &request.request_id,
2301            ),
2302        ))
2303    }
2304
2305    fn test_failover(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2306        let replication_group_id = required_param(request, "ReplicationGroupId")?;
2307        let node_group_id = required_param(request, "NodeGroupId")?;
2308
2309        let state = self.state.read();
2310
2311        let group = state
2312            .replication_groups
2313            .get(&replication_group_id)
2314            .ok_or_else(|| {
2315                AwsServiceError::aws_error(
2316                    StatusCode::NOT_FOUND,
2317                    "ReplicationGroupNotFoundFault",
2318                    format!("ReplicationGroup {replication_group_id} not found."),
2319                )
2320            })?;
2321
2322        // Our replication groups always have a single node group with ID "0001"
2323        if node_group_id != "0001" {
2324            return Err(AwsServiceError::aws_error(
2325                StatusCode::NOT_FOUND,
2326                "NodeGroupNotFoundFault",
2327                format!("NodeGroup {node_group_id} not found in ReplicationGroup {replication_group_id}."),
2328            ));
2329        }
2330
2331        let region = state.region.clone();
2332        let xml = replication_group_xml(group, &region);
2333
2334        Ok(AwsResponse::xml(
2335            StatusCode::OK,
2336            xml_wrap(
2337                "TestFailover",
2338                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2339                &request.request_id,
2340            ),
2341        ))
2342    }
2343
2344    fn disassociate_global_replication_group(
2345        &self,
2346        request: &AwsRequest,
2347    ) -> Result<AwsResponse, AwsServiceError> {
2348        let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2349        let replication_group_id = required_param(request, "ReplicationGroupId")?;
2350        let replication_group_region = required_param(request, "ReplicationGroupRegion")?;
2351
2352        let state = self.state.read();
2353        let group = state
2354            .global_replication_groups
2355            .get(&global_replication_group_id)
2356            .ok_or_else(|| {
2357                AwsServiceError::aws_error(
2358                    StatusCode::NOT_FOUND,
2359                    "GlobalReplicationGroupNotFoundFault",
2360                    format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2361                )
2362            })?;
2363
2364        let primary_member = primary_global_member(group).ok_or_else(|| {
2365            AwsServiceError::aws_error(
2366                StatusCode::BAD_REQUEST,
2367                "InvalidGlobalReplicationGroupState",
2368                format!(
2369                    "GlobalReplicationGroup {global_replication_group_id} does not have a primary member."
2370                ),
2371            )
2372        })?;
2373        if primary_member.replication_group_id != replication_group_id
2374            || primary_member.replication_group_region != replication_group_region
2375        {
2376            return Err(AwsServiceError::aws_error(
2377                StatusCode::BAD_REQUEST,
2378                "InvalidParameterValue",
2379                format!(
2380                    "ReplicationGroup {replication_group_id} in region {replication_group_region} is not associated with GlobalReplicationGroup {global_replication_group_id}."
2381                ),
2382            ));
2383        }
2384
2385        let xml = global_replication_group_xml(group, true);
2386        Ok(AwsResponse::xml(
2387            StatusCode::OK,
2388            xml_wrap(
2389                "DisassociateGlobalReplicationGroup",
2390                &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2391                &request.request_id,
2392            ),
2393        ))
2394    }
2395
2396    fn failover_global_replication_group(
2397        &self,
2398        request: &AwsRequest,
2399    ) -> Result<AwsResponse, AwsServiceError> {
2400        let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2401        let primary_region = required_param(request, "PrimaryRegion")?;
2402        let primary_replication_group_id = required_param(request, "PrimaryReplicationGroupId")?;
2403
2404        let state = self.state.read();
2405        let group = state
2406            .global_replication_groups
2407            .get(&global_replication_group_id)
2408            .ok_or_else(|| {
2409                AwsServiceError::aws_error(
2410                    StatusCode::NOT_FOUND,
2411                    "GlobalReplicationGroupNotFoundFault",
2412                    format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2413                )
2414            })?;
2415
2416        let primary_member = primary_global_member(group).ok_or_else(|| {
2417            AwsServiceError::aws_error(
2418                StatusCode::BAD_REQUEST,
2419                "InvalidGlobalReplicationGroupState",
2420                format!(
2421                    "GlobalReplicationGroup {global_replication_group_id} does not have a primary member."
2422                ),
2423            )
2424        })?;
2425        if primary_member.replication_group_id != primary_replication_group_id
2426            || primary_member.replication_group_region != primary_region
2427        {
2428            return Err(AwsServiceError::aws_error(
2429                StatusCode::BAD_REQUEST,
2430                "InvalidParameterValue",
2431                format!(
2432                    "PrimaryReplicationGroupId and PrimaryRegion do not match the current primary for GlobalReplicationGroup {global_replication_group_id}."
2433                ),
2434            ));
2435        }
2436
2437        let xml = global_replication_group_xml(group, true);
2438        Ok(AwsResponse::xml(
2439            StatusCode::OK,
2440            xml_wrap(
2441                "FailoverGlobalReplicationGroup",
2442                &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2443                &request.request_id,
2444            ),
2445        ))
2446    }
2447
2448    fn create_user(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2449        let user_id = required_param(request, "UserId")?;
2450        let user_name = required_param(request, "UserName")?;
2451        let engine = required_param(request, "Engine")?;
2452        let access_string = required_param(request, "AccessString")?;
2453
2454        validate_engine(&engine)?;
2455
2456        let no_password_required =
2457            parse_optional_bool(optional_param(request, "NoPasswordRequired").as_deref())?
2458                .unwrap_or(false);
2459        let passwords = parse_member_list(&request.query_params, "Passwords", "member");
2460        let auth_mode_type = optional_param(request, "AuthenticationMode.Type");
2461
2462        let (authentication_type, password_count) = if no_password_required {
2463            if !passwords.is_empty() {
2464                return Err(AwsServiceError::aws_error(
2465                    StatusCode::BAD_REQUEST,
2466                    "InvalidParameterCombination",
2467                    "Passwords cannot be provided when NoPasswordRequired is true.".to_string(),
2468                ));
2469            }
2470            ("no-password".to_string(), 0)
2471        } else if let Some(ref mode) = auth_mode_type {
2472            let mode_passwords = parse_member_list(
2473                &request.query_params,
2474                "AuthenticationMode.Passwords",
2475                "member",
2476            );
2477            match mode.as_str() {
2478                "password" => {
2479                    if mode_passwords.is_empty() {
2480                        return Err(AwsServiceError::aws_error(
2481                            StatusCode::BAD_REQUEST,
2482                            "InvalidParameterValue",
2483                            "At least one password is required when AuthenticationMode.Type is password.".to_string(),
2484                        ));
2485                    }
2486                    ("password".to_string(), mode_passwords.len() as i32)
2487                }
2488                "no-password-required" | "iam" => {
2489                    if !mode_passwords.is_empty() {
2490                        return Err(AwsServiceError::aws_error(
2491                            StatusCode::BAD_REQUEST,
2492                            "InvalidParameterValue",
2493                            format!("Passwords cannot be provided when AuthenticationMode.Type is {mode}."),
2494                        ));
2495                    }
2496                    (mode.clone(), 0)
2497                }
2498                _ => {
2499                    return Err(AwsServiceError::aws_error(
2500                        StatusCode::BAD_REQUEST,
2501                        "InvalidParameterValue",
2502                        format!("Invalid value for AuthenticationMode.Type: {mode}. Supported values: password, iam, no-password-required"),
2503                    ));
2504                }
2505            }
2506        } else if !passwords.is_empty() {
2507            ("password".to_string(), passwords.len() as i32)
2508        } else {
2509            ("no-password".to_string(), 0)
2510        };
2511
2512        let minimum_engine_version = if engine == ENGINE_VALKEY {
2513            "8.0".to_string()
2514        } else {
2515            "6.0".to_string()
2516        };
2517
2518        let mut state = self.state.write();
2519
2520        if state.users.contains_key(&user_id) {
2521            return Err(AwsServiceError::aws_error(
2522                StatusCode::BAD_REQUEST,
2523                "UserAlreadyExistsFault",
2524                format!("User {user_id} already exists."),
2525            ));
2526        }
2527
2528        let arn = format!(
2529            "arn:aws:elasticache:{}:{}:user:{}",
2530            state.region, state.account_id, user_id
2531        );
2532
2533        let user = ElastiCacheUser {
2534            user_id: user_id.clone(),
2535            user_name,
2536            engine,
2537            access_string,
2538            status: "active".to_string(),
2539            authentication_type,
2540            password_count,
2541            arn,
2542            minimum_engine_version,
2543            user_group_ids: Vec::new(),
2544        };
2545
2546        let xml = user_xml(&user);
2547        state.register_arn(&user.arn);
2548        state.users.insert(user_id, user);
2549
2550        Ok(AwsResponse::xml(
2551            StatusCode::OK,
2552            xml_wrap("CreateUser", &xml, &request.request_id),
2553        ))
2554    }
2555
2556    fn describe_users(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2557        let user_id = optional_param(request, "UserId");
2558        let max_records = optional_usize_param(request, "MaxRecords")?;
2559        let marker = optional_param(request, "Marker");
2560
2561        let state = self.state.read();
2562
2563        let users: Vec<&ElastiCacheUser> = if let Some(ref id) = user_id {
2564            match state.users.get(id) {
2565                Some(u) => vec![u],
2566                None => {
2567                    return Err(AwsServiceError::aws_error(
2568                        StatusCode::NOT_FOUND,
2569                        "UserNotFoundFault",
2570                        format!("User {id} not found."),
2571                    ));
2572                }
2573            }
2574        } else {
2575            let mut users: Vec<&ElastiCacheUser> = state.users.values().collect();
2576            users.sort_by(|a, b| a.user_id.cmp(&b.user_id));
2577            users
2578        };
2579
2580        let (page, next_marker) = paginate(&users, marker.as_deref(), max_records);
2581
2582        let members_xml: String = page
2583            .iter()
2584            .map(|u| format!("<member>{}</member>", user_xml(u)))
2585            .collect();
2586        let marker_xml = next_marker
2587            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
2588            .unwrap_or_default();
2589
2590        Ok(AwsResponse::xml(
2591            StatusCode::OK,
2592            xml_wrap(
2593                "DescribeUsers",
2594                &format!("<Users>{members_xml}</Users>{marker_xml}"),
2595                &request.request_id,
2596            ),
2597        ))
2598    }
2599
2600    fn delete_user(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2601        let user_id = required_param(request, "UserId")?;
2602
2603        if user_id == "default" {
2604            return Err(AwsServiceError::aws_error(
2605                StatusCode::BAD_REQUEST,
2606                "InvalidParameterValue",
2607                "Cannot delete the default user.".to_string(),
2608            ));
2609        }
2610
2611        let mut state = self.state.write();
2612
2613        let user = state.users.remove(&user_id).ok_or_else(|| {
2614            AwsServiceError::aws_error(
2615                StatusCode::NOT_FOUND,
2616                "UserNotFoundFault",
2617                format!("User {user_id} not found."),
2618            )
2619        })?;
2620
2621        state.tags.remove(&user.arn);
2622
2623        // Remove user from any user groups
2624        for group in state.user_groups.values_mut() {
2625            group.user_ids.retain(|id| id != &user_id);
2626        }
2627
2628        let mut deleted_user = user;
2629        deleted_user.status = "deleting".to_string();
2630        let xml = user_xml(&deleted_user);
2631
2632        Ok(AwsResponse::xml(
2633            StatusCode::OK,
2634            xml_wrap("DeleteUser", &xml, &request.request_id),
2635        ))
2636    }
2637
2638    fn create_user_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2639        let user_group_id = required_param(request, "UserGroupId")?;
2640        let engine = required_param(request, "Engine")?;
2641
2642        validate_engine(&engine)?;
2643
2644        let user_ids = parse_member_list(&request.query_params, "UserIds", "member");
2645
2646        let minimum_engine_version = if engine == ENGINE_VALKEY {
2647            "8.0".to_string()
2648        } else {
2649            "6.0".to_string()
2650        };
2651
2652        let mut state = self.state.write();
2653
2654        if state.user_groups.contains_key(&user_group_id) {
2655            return Err(AwsServiceError::aws_error(
2656                StatusCode::BAD_REQUEST,
2657                "UserGroupAlreadyExistsFault",
2658                format!("User Group {user_group_id} already exists."),
2659            ));
2660        }
2661
2662        // Validate all referenced users exist and have a matching engine
2663        for uid in &user_ids {
2664            match state.users.get(uid) {
2665                None => {
2666                    return Err(AwsServiceError::aws_error(
2667                        StatusCode::NOT_FOUND,
2668                        "UserNotFoundFault",
2669                        format!("User {uid} not found."),
2670                    ));
2671                }
2672                Some(user) if user.engine != engine => {
2673                    return Err(AwsServiceError::aws_error(
2674                        StatusCode::BAD_REQUEST,
2675                        "InvalidParameterValue",
2676                        format!(
2677                            "User {uid} has engine {} which does not match the user group engine {engine}.",
2678                            user.engine
2679                        ),
2680                    ));
2681                }
2682                _ => {}
2683            }
2684        }
2685
2686        let arn = format!(
2687            "arn:aws:elasticache:{}:{}:usergroup:{}",
2688            state.region, state.account_id, user_group_id
2689        );
2690
2691        let group = ElastiCacheUserGroup {
2692            user_group_id: user_group_id.clone(),
2693            engine,
2694            status: "active".to_string(),
2695            user_ids: user_ids.clone(),
2696            arn,
2697            minimum_engine_version,
2698            pending_changes: None,
2699            replication_groups: Vec::new(),
2700        };
2701
2702        // Update user_group_ids on referenced users
2703        for uid in &user_ids {
2704            if let Some(user) = state.users.get_mut(uid) {
2705                user.user_group_ids.push(user_group_id.clone());
2706            }
2707        }
2708
2709        let xml = user_group_xml(&group);
2710        state.register_arn(&group.arn);
2711        state.user_groups.insert(user_group_id, group);
2712
2713        Ok(AwsResponse::xml(
2714            StatusCode::OK,
2715            xml_wrap("CreateUserGroup", &xml, &request.request_id),
2716        ))
2717    }
2718
2719    fn describe_user_groups(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2720        let user_group_id = optional_param(request, "UserGroupId");
2721        let max_records = optional_usize_param(request, "MaxRecords")?;
2722        let marker = optional_param(request, "Marker");
2723
2724        let state = self.state.read();
2725
2726        let groups: Vec<&ElastiCacheUserGroup> = if let Some(ref id) = user_group_id {
2727            match state.user_groups.get(id) {
2728                Some(g) => vec![g],
2729                None => {
2730                    return Err(AwsServiceError::aws_error(
2731                        StatusCode::NOT_FOUND,
2732                        "UserGroupNotFoundFault",
2733                        format!("User Group {id} not found."),
2734                    ));
2735                }
2736            }
2737        } else {
2738            let mut groups: Vec<&ElastiCacheUserGroup> = state.user_groups.values().collect();
2739            groups.sort_by(|a, b| a.user_group_id.cmp(&b.user_group_id));
2740            groups
2741        };
2742
2743        let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
2744
2745        let members_xml: String = page
2746            .iter()
2747            .map(|g| format!("<member>{}</member>", user_group_xml(g)))
2748            .collect();
2749        let marker_xml = next_marker
2750            .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
2751            .unwrap_or_default();
2752
2753        Ok(AwsResponse::xml(
2754            StatusCode::OK,
2755            xml_wrap(
2756                "DescribeUserGroups",
2757                &format!("<UserGroups>{members_xml}</UserGroups>{marker_xml}"),
2758                &request.request_id,
2759            ),
2760        ))
2761    }
2762
2763    fn delete_user_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764        let user_group_id = required_param(request, "UserGroupId")?;
2765
2766        let mut state = self.state.write();
2767
2768        let group = state.user_groups.remove(&user_group_id).ok_or_else(|| {
2769            AwsServiceError::aws_error(
2770                StatusCode::NOT_FOUND,
2771                "UserGroupNotFoundFault",
2772                format!("User Group {user_group_id} not found."),
2773            )
2774        })?;
2775
2776        state.tags.remove(&group.arn);
2777
2778        // Remove this group from users' user_group_ids
2779        for uid in &group.user_ids {
2780            if let Some(user) = state.users.get_mut(uid) {
2781                user.user_group_ids.retain(|gid| gid != &user_group_id);
2782            }
2783        }
2784
2785        let mut deleted_group = group;
2786        deleted_group.status = "deleting".to_string();
2787        let xml = user_group_xml(&deleted_group);
2788
2789        Ok(AwsResponse::xml(
2790            StatusCode::OK,
2791            xml_wrap("DeleteUserGroup", &xml, &request.request_id),
2792        ))
2793    }
2794
2795    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2796        let resource_name = required_param(request, "ResourceName")?;
2797        let tags = parse_tags(request)?;
2798
2799        let mut state = self.state.write();
2800        let tag_list = state.tags.get_mut(&resource_name).ok_or_else(|| {
2801            AwsServiceError::aws_error(
2802                StatusCode::NOT_FOUND,
2803                "CacheClusterNotFound",
2804                format!("The resource {resource_name} could not be found."),
2805            )
2806        })?;
2807
2808        merge_tags(tag_list, &tags);
2809
2810        let tag_xml: String = tag_list.iter().map(tag_xml).collect();
2811
2812        Ok(AwsResponse::xml(
2813            StatusCode::OK,
2814            xml_wrap(
2815                "AddTagsToResource",
2816                &format!("<TagList>{tag_xml}</TagList>"),
2817                &request.request_id,
2818            ),
2819        ))
2820    }
2821
2822    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2823        let resource_name = required_param(request, "ResourceName")?;
2824
2825        let state = self.state.read();
2826        let tag_list = state.tags.get(&resource_name).ok_or_else(|| {
2827            AwsServiceError::aws_error(
2828                StatusCode::NOT_FOUND,
2829                "CacheClusterNotFound",
2830                format!("The resource {resource_name} could not be found."),
2831            )
2832        })?;
2833
2834        let tag_xml: String = tag_list.iter().map(tag_xml).collect();
2835
2836        Ok(AwsResponse::xml(
2837            StatusCode::OK,
2838            xml_wrap(
2839                "ListTagsForResource",
2840                &format!("<TagList>{tag_xml}</TagList>"),
2841                &request.request_id,
2842            ),
2843        ))
2844    }
2845
2846    fn remove_tags_from_resource(
2847        &self,
2848        request: &AwsRequest,
2849    ) -> Result<AwsResponse, AwsServiceError> {
2850        let resource_name = required_param(request, "ResourceName")?;
2851        let tag_keys = parse_tag_keys(request)?;
2852
2853        let mut state = self.state.write();
2854        let tag_list = state.tags.get_mut(&resource_name).ok_or_else(|| {
2855            AwsServiceError::aws_error(
2856                StatusCode::NOT_FOUND,
2857                "CacheClusterNotFound",
2858                format!("The resource {resource_name} could not be found."),
2859            )
2860        })?;
2861
2862        tag_list.retain(|(key, _)| !tag_keys.contains(key));
2863
2864        Ok(AwsResponse::xml(
2865            StatusCode::OK,
2866            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
2867        ))
2868    }
2869}
2870
2871// Helpers
2872
2873fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2874    fakecloud_core::query::optional_query_param(req, name)
2875}
2876
2877fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2878    fakecloud_core::query::required_query_param(req, name)
2879}
2880
2881fn parse_required_bool(req: &AwsRequest, name: &str) -> Result<bool, AwsServiceError> {
2882    parse_optional_bool(Some(&required_param(req, name)?))?.ok_or_else(|| {
2883        AwsServiceError::aws_error(
2884            StatusCode::BAD_REQUEST,
2885            "InvalidParameterValue",
2886            format!("Boolean parameter {name} is invalid."),
2887        )
2888    })
2889}
2890
2891fn validate_serverless_engine(engine: &str) -> Result<(), AwsServiceError> {
2892    validate_engine(engine)
2893}
2894
2895fn default_major_engine_version(engine: &str) -> &'static str {
2896    if engine == ENGINE_VALKEY {
2897        "8.0"
2898    } else {
2899        "7.1"
2900    }
2901}
2902
2903fn default_full_engine_version(
2904    engine: &str,
2905    major_engine_version: &str,
2906) -> Result<String, AwsServiceError> {
2907    if major_engine_version.is_empty() {
2908        return Err(AwsServiceError::aws_error(
2909            StatusCode::BAD_REQUEST,
2910            "InvalidParameterValue",
2911            "MajorEngineVersion must not be empty.".to_string(),
2912        ));
2913    }
2914
2915    if (engine == ENGINE_REDIS && !major_engine_version.starts_with('7'))
2916        || (engine == ENGINE_VALKEY && !major_engine_version.starts_with('8'))
2917    {
2918        return Err(AwsServiceError::aws_error(
2919            StatusCode::BAD_REQUEST,
2920            "InvalidParameterValue",
2921            format!(
2922                "MajorEngineVersion {major_engine_version} is not supported for engine {engine}."
2923            ),
2924        ));
2925    }
2926
2927    Ok(major_engine_version.to_string())
2928}
2929
2930fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2931    value
2932        .map(|raw| match raw {
2933            "true" | "True" | "TRUE" => Ok(true),
2934            "false" | "False" | "FALSE" => Ok(false),
2935            _ => Err(AwsServiceError::aws_error(
2936                StatusCode::BAD_REQUEST,
2937                "InvalidParameterValue",
2938                format!("Boolean parameter value '{raw}' is invalid."),
2939            )),
2940        })
2941        .transpose()
2942}
2943
2944fn optional_non_negative_i32_param(
2945    req: &AwsRequest,
2946    name: &str,
2947) -> Result<Option<i32>, AwsServiceError> {
2948    optional_param(req, name)
2949        .map(|v| {
2950            let parsed = v.parse::<i32>().map_err(|_| {
2951                AwsServiceError::aws_error(
2952                    StatusCode::BAD_REQUEST,
2953                    "InvalidParameterValue",
2954                    format!("Invalid value for {name}: '{v}'"),
2955                )
2956            })?;
2957            if parsed < 0 {
2958                return Err(AwsServiceError::aws_error(
2959                    StatusCode::BAD_REQUEST,
2960                    "InvalidParameterValue",
2961                    format!("{name} must be non-negative, got {parsed}"),
2962                ));
2963            }
2964            Ok(parsed)
2965        })
2966        .transpose()
2967}
2968
2969fn parse_cache_usage_limits(
2970    req: &AwsRequest,
2971) -> Result<Option<ServerlessCacheUsageLimits>, AwsServiceError> {
2972    let data_storage_maximum =
2973        optional_non_negative_i32_param(req, "CacheUsageLimits.DataStorage.Maximum")?;
2974    let data_storage_minimum =
2975        optional_non_negative_i32_param(req, "CacheUsageLimits.DataStorage.Minimum")?;
2976    let data_storage_unit = optional_param(req, "CacheUsageLimits.DataStorage.Unit");
2977    let ecpu_maximum =
2978        optional_non_negative_i32_param(req, "CacheUsageLimits.ECPUPerSecond.Maximum")?;
2979    let ecpu_minimum =
2980        optional_non_negative_i32_param(req, "CacheUsageLimits.ECPUPerSecond.Minimum")?;
2981
2982    if let (Some(minimum), Some(maximum)) = (data_storage_minimum, data_storage_maximum) {
2983        if minimum > maximum {
2984            return Err(AwsServiceError::aws_error(
2985                StatusCode::BAD_REQUEST,
2986                "InvalidParameterValue",
2987                format!(
2988                    "CacheUsageLimits.DataStorage.Minimum ({minimum}) must be less than or equal to Maximum ({maximum})."
2989                ),
2990            ));
2991        }
2992    }
2993    if let (Some(minimum), Some(maximum)) = (ecpu_minimum, ecpu_maximum) {
2994        if minimum > maximum {
2995            return Err(AwsServiceError::aws_error(
2996                StatusCode::BAD_REQUEST,
2997                "InvalidParameterValue",
2998                format!(
2999                    "CacheUsageLimits.ECPUPerSecond.Minimum ({minimum}) must be less than or equal to Maximum ({maximum})."
3000                ),
3001            ));
3002        }
3003    }
3004
3005    let data_storage = if data_storage_maximum.is_some()
3006        || data_storage_minimum.is_some()
3007        || data_storage_unit.is_some()
3008    {
3009        Some(ServerlessCacheDataStorage {
3010            maximum: data_storage_maximum,
3011            minimum: data_storage_minimum,
3012            unit: data_storage_unit,
3013        })
3014    } else {
3015        None
3016    };
3017    let ecpu_per_second = if ecpu_maximum.is_some() || ecpu_minimum.is_some() {
3018        Some(ServerlessCacheEcpuPerSecond {
3019            maximum: ecpu_maximum,
3020            minimum: ecpu_minimum,
3021        })
3022    } else {
3023        None
3024    };
3025
3026    if data_storage.is_none() && ecpu_per_second.is_none() {
3027        Ok(None)
3028    } else {
3029        Ok(Some(ServerlessCacheUsageLimits {
3030            data_storage,
3031            ecpu_per_second,
3032        }))
3033    }
3034}
3035
3036/// Parse an AWS Query protocol member list.
3037///
3038/// AWS encodes lists in the query string as:
3039///   `{param}.{member_name}.1=val1&{param}.{member_name}.2=val2`
3040///
3041/// Returns the values sorted by index.
3042fn parse_member_list(
3043    params: &std::collections::HashMap<String, String>,
3044    param: &str,
3045    member_name: &str,
3046) -> Vec<String> {
3047    let prefix = format!("{param}.{member_name}.");
3048    let mut indexed: Vec<(usize, String)> = params
3049        .iter()
3050        .filter_map(|(k, v)| {
3051            k.strip_prefix(&prefix)
3052                .and_then(|idx| idx.parse::<usize>().ok())
3053                .map(|idx| (idx, v.clone()))
3054        })
3055        .collect();
3056    indexed.sort_by_key(|(idx, _)| *idx);
3057    indexed.into_iter().map(|(_, v)| v).collect()
3058}
3059
3060fn parse_query_list_param(req: &AwsRequest, param: &str, member_name: &str) -> Vec<String> {
3061    let mut indexed = parse_member_list(&req.query_params, param, member_name);
3062    if indexed.is_empty() {
3063        indexed = parse_member_list(&req.query_params, param, "member");
3064    }
3065    if indexed.is_empty() {
3066        indexed = req.query_params.get(param).into_iter().cloned().collect();
3067    }
3068    indexed
3069}
3070
3071fn optional_usize_param(req: &AwsRequest, name: &str) -> Result<Option<usize>, AwsServiceError> {
3072    optional_param(req, name)
3073        .map(|v| {
3074            v.parse::<usize>().map_err(|_| {
3075                AwsServiceError::aws_error(
3076                    StatusCode::BAD_REQUEST,
3077                    "InvalidParameterValue",
3078                    format!("Value '{v}' for parameter {name} is not a valid integer."),
3079                )
3080            })
3081        })
3082        .transpose()
3083}
3084
3085fn parse_reserved_duration_filter(value: Option<String>) -> Result<Option<i32>, AwsServiceError> {
3086    value
3087        .map(|raw| match raw.as_str() {
3088            "1" => Ok(31_536_000),
3089            "3" => Ok(94_608_000),
3090            "31536000" => Ok(31_536_000),
3091            "94608000" => Ok(94_608_000),
3092            _ => Err(AwsServiceError::aws_error(
3093                StatusCode::BAD_REQUEST,
3094                "InvalidParameterValue",
3095                format!(
3096                    "Invalid value for Duration: {raw}. Valid values are 1, 3, 31536000, or 94608000."
3097                ),
3098            )),
3099        })
3100        .transpose()
3101}
3102
3103/// Simple index-based pagination. Returns the current page and an optional next marker.
3104fn paginate<T: Clone>(
3105    items: &[T],
3106    marker: Option<&str>,
3107    max_records: Option<usize>,
3108) -> (Vec<T>, Option<String>) {
3109    let start = marker.and_then(|m| m.parse::<usize>().ok()).unwrap_or(0);
3110    let limit = max_records.unwrap_or(100).min(100);
3111
3112    if start >= items.len() {
3113        return (Vec::new(), None);
3114    }
3115
3116    let end = start.saturating_add(limit).min(items.len());
3117    let page = items[start..end].to_vec();
3118    let next_marker = if end < items.len() {
3119        Some(end.to_string())
3120    } else {
3121        None
3122    };
3123    (page, next_marker)
3124}
3125
3126// Tag helpers
3127
3128fn parse_tags(req: &AwsRequest) -> Result<Vec<(String, String)>, AwsServiceError> {
3129    let mut tags = Vec::new();
3130    for index in 1.. {
3131        let key_name = format!("Tags.Tag.{index}.Key");
3132        let value_name = format!("Tags.Tag.{index}.Value");
3133        let key = optional_param(req, &key_name);
3134        let value = optional_param(req, &value_name);
3135        match (key, value) {
3136            (Some(k), Some(v)) => tags.push((k, v)),
3137            (None, None) => break,
3138            _ => {
3139                return Err(AwsServiceError::aws_error(
3140                    StatusCode::BAD_REQUEST,
3141                    "InvalidParameterValue",
3142                    "Each tag must include both Key and Value.",
3143                ));
3144            }
3145        }
3146    }
3147    Ok(tags)
3148}
3149
3150fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
3151    let mut keys = Vec::new();
3152    for index in 1.. {
3153        let key_name = format!("TagKeys.member.{index}");
3154        match optional_param(req, &key_name) {
3155            Some(key) => keys.push(key),
3156            None => break,
3157        }
3158    }
3159    Ok(keys)
3160}
3161
3162fn merge_tags(existing: &mut Vec<(String, String)>, incoming: &[(String, String)]) {
3163    for (key, value) in incoming {
3164        if let Some(existing_tag) = existing.iter_mut().find(|(k, _)| k == key) {
3165            existing_tag.1 = value.clone();
3166        } else {
3167            existing.push((key.clone(), value.clone()));
3168        }
3169    }
3170}
3171
3172fn tag_xml(tag: &(String, String)) -> String {
3173    format!(
3174        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
3175        xml_escape(&tag.0),
3176        xml_escape(&tag.1),
3177    )
3178}
3179
3180// Filtering
3181
3182fn filter_engine_versions(
3183    versions: &[CacheEngineVersion],
3184    engine: &Option<String>,
3185    engine_version: &Option<String>,
3186    family: &Option<String>,
3187) -> Vec<CacheEngineVersion> {
3188    versions
3189        .iter()
3190        .filter(|v| engine.as_ref().is_none_or(|expected| v.engine == *expected))
3191        .filter(|v| {
3192            engine_version
3193                .as_ref()
3194                .is_none_or(|expected| v.engine_version == *expected)
3195        })
3196        .filter(|v| {
3197            family
3198                .as_ref()
3199                .is_none_or(|expected| v.cache_parameter_group_family == *expected)
3200        })
3201        .cloned()
3202        .collect()
3203}
3204
3205// XML formatting
3206
3207fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
3208    fakecloud_core::query::query_response_xml(action, ELASTICACHE_NS, inner, request_id)
3209}
3210
3211fn engine_version_xml(v: &CacheEngineVersion) -> String {
3212    format!(
3213        "<CacheEngineVersion>\
3214         <Engine>{}</Engine>\
3215         <EngineVersion>{}</EngineVersion>\
3216         <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
3217         <CacheEngineDescription>{}</CacheEngineDescription>\
3218         <CacheEngineVersionDescription>{}</CacheEngineVersionDescription>\
3219         </CacheEngineVersion>",
3220        xml_escape(&v.engine),
3221        xml_escape(&v.engine_version),
3222        xml_escape(&v.cache_parameter_group_family),
3223        xml_escape(&v.cache_engine_description),
3224        xml_escape(&v.cache_engine_version_description),
3225    )
3226}
3227
3228fn recurring_charge_xml(charge: &RecurringCharge) -> String {
3229    format!(
3230        "<RecurringCharge>\
3231         <RecurringChargeAmount>{}</RecurringChargeAmount>\
3232         <RecurringChargeFrequency>{}</RecurringChargeFrequency>\
3233         </RecurringCharge>",
3234        charge.recurring_charge_amount,
3235        xml_escape(&charge.recurring_charge_frequency),
3236    )
3237}
3238
3239fn reserved_cache_node_xml(node: &ReservedCacheNode) -> String {
3240    let recurring_charges_xml: String = node
3241        .recurring_charges
3242        .iter()
3243        .map(recurring_charge_xml)
3244        .collect();
3245
3246    format!(
3247        "<ReservedCacheNode>\
3248         <ReservedCacheNodeId>{}</ReservedCacheNodeId>\
3249         <ReservedCacheNodesOfferingId>{}</ReservedCacheNodesOfferingId>\
3250         <CacheNodeType>{}</CacheNodeType>\
3251         <StartTime>{}</StartTime>\
3252         <Duration>{}</Duration>\
3253         <FixedPrice>{}</FixedPrice>\
3254         <UsagePrice>{}</UsagePrice>\
3255         <CacheNodeCount>{}</CacheNodeCount>\
3256         <ProductDescription>{}</ProductDescription>\
3257         <OfferingType>{}</OfferingType>\
3258         <State>{}</State>\
3259         <RecurringCharges>{}</RecurringCharges>\
3260         <ReservationARN>{}</ReservationARN>\
3261         </ReservedCacheNode>",
3262        xml_escape(&node.reserved_cache_node_id),
3263        xml_escape(&node.reserved_cache_nodes_offering_id),
3264        xml_escape(&node.cache_node_type),
3265        xml_escape(&node.start_time),
3266        node.duration,
3267        node.fixed_price,
3268        node.usage_price,
3269        node.cache_node_count,
3270        xml_escape(&node.product_description),
3271        xml_escape(&node.offering_type),
3272        xml_escape(&node.state),
3273        recurring_charges_xml,
3274        xml_escape(&node.reservation_arn),
3275    )
3276}
3277
3278fn reserved_cache_nodes_offering_xml(offering: &ReservedCacheNodesOffering) -> String {
3279    let recurring_charges_xml: String = offering
3280        .recurring_charges
3281        .iter()
3282        .map(recurring_charge_xml)
3283        .collect();
3284
3285    format!(
3286        "<ReservedCacheNodesOffering>\
3287         <ReservedCacheNodesOfferingId>{}</ReservedCacheNodesOfferingId>\
3288         <CacheNodeType>{}</CacheNodeType>\
3289         <Duration>{}</Duration>\
3290         <FixedPrice>{}</FixedPrice>\
3291         <UsagePrice>{}</UsagePrice>\
3292         <ProductDescription>{}</ProductDescription>\
3293         <OfferingType>{}</OfferingType>\
3294         <RecurringCharges>{}</RecurringCharges>\
3295         </ReservedCacheNodesOffering>",
3296        xml_escape(&offering.reserved_cache_nodes_offering_id),
3297        xml_escape(&offering.cache_node_type),
3298        offering.duration,
3299        offering.fixed_price,
3300        offering.usage_price,
3301        xml_escape(&offering.product_description),
3302        xml_escape(&offering.offering_type),
3303        recurring_charges_xml,
3304    )
3305}
3306
3307fn cache_parameter_group_xml(g: &CacheParameterGroup) -> String {
3308    format!(
3309        "<CacheParameterGroup>\
3310         <CacheParameterGroupName>{}</CacheParameterGroupName>\
3311         <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
3312         <Description>{}</Description>\
3313         <IsGlobal>{}</IsGlobal>\
3314         <ARN>{}</ARN>\
3315         </CacheParameterGroup>",
3316        xml_escape(&g.cache_parameter_group_name),
3317        xml_escape(&g.cache_parameter_group_family),
3318        xml_escape(&g.description),
3319        g.is_global,
3320        xml_escape(&g.arn),
3321    )
3322}
3323
3324fn cache_subnet_group_xml(g: &CacheSubnetGroup, region: &str) -> String {
3325    let subnets_xml: String = g
3326        .subnet_ids
3327        .iter()
3328        .enumerate()
3329        .map(|(i, id)| {
3330            let az = format!("{}{}", region, (b'a' + (i % 6) as u8) as char);
3331            format!(
3332                "<Subnet>\
3333                 <SubnetIdentifier>{}</SubnetIdentifier>\
3334                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3335                 </Subnet>",
3336                xml_escape(id),
3337                xml_escape(&az),
3338            )
3339        })
3340        .collect();
3341    format!(
3342        "<CacheSubnetGroupName>{}</CacheSubnetGroupName>\
3343         <CacheSubnetGroupDescription>{}</CacheSubnetGroupDescription>\
3344         <VpcId>{}</VpcId>\
3345         <Subnets>{subnets_xml}</Subnets>\
3346         <ARN>{}</ARN>",
3347        xml_escape(&g.cache_subnet_group_name),
3348        xml_escape(&g.cache_subnet_group_description),
3349        xml_escape(&g.vpc_id),
3350        xml_escape(&g.arn),
3351    )
3352}
3353
3354fn cache_cluster_xml(cluster: &CacheCluster, show_cache_node_info: bool) -> String {
3355    let cache_subnet_group_name_xml = cluster
3356        .cache_subnet_group_name
3357        .as_ref()
3358        .map(|name| {
3359            format!(
3360                "<CacheSubnetGroupName>{}</CacheSubnetGroupName>",
3361                xml_escape(name)
3362            )
3363        })
3364        .unwrap_or_default();
3365    let replication_group_id_xml = cluster
3366        .replication_group_id
3367        .as_ref()
3368        .map(|group_id| {
3369            format!(
3370                "<ReplicationGroupId>{}</ReplicationGroupId>",
3371                xml_escape(group_id)
3372            )
3373        })
3374        .unwrap_or_default();
3375    let cache_nodes_xml = if show_cache_node_info {
3376        match usize::try_from(cluster.num_cache_nodes) {
3377            Ok(node_count) => {
3378                let members: String = (0..node_count)
3379                    .filter_map(|index| {
3380                        let node_id = index.checked_add(1)?;
3381                        Some(cache_node_xml(cluster, node_id))
3382                    })
3383                    .collect();
3384                format!("<CacheNodes>{members}</CacheNodes>")
3385            }
3386            Err(_) => String::new(),
3387        }
3388    } else {
3389        String::new()
3390    };
3391
3392    format!(
3393        "<CacheClusterId>{}</CacheClusterId>\
3394         <CacheNodeType>{}</CacheNodeType>\
3395         <Engine>{}</Engine>\
3396         <EngineVersion>{}</EngineVersion>\
3397         <CacheClusterStatus>{}</CacheClusterStatus>\
3398         <NumCacheNodes>{}</NumCacheNodes>\
3399         <PreferredAvailabilityZone>{}</PreferredAvailabilityZone>\
3400         <CacheClusterCreateTime>{}</CacheClusterCreateTime>\
3401         {cache_subnet_group_name_xml}\
3402         {cache_nodes_xml}\
3403         <AutoMinorVersionUpgrade>{}</AutoMinorVersionUpgrade>\
3404         {replication_group_id_xml}\
3405         <ARN>{}</ARN>",
3406        xml_escape(&cluster.cache_cluster_id),
3407        xml_escape(&cluster.cache_node_type),
3408        xml_escape(&cluster.engine),
3409        xml_escape(&cluster.engine_version),
3410        xml_escape(&cluster.cache_cluster_status),
3411        cluster.num_cache_nodes,
3412        xml_escape(&cluster.preferred_availability_zone),
3413        xml_escape(&cluster.created_at),
3414        cluster.auto_minor_version_upgrade,
3415        xml_escape(&cluster.arn),
3416    )
3417}
3418
3419fn cache_node_xml(cluster: &CacheCluster, node_id: usize) -> String {
3420    format!(
3421        "<CacheNode>\
3422         <CacheNodeId>{node_id:04}</CacheNodeId>\
3423         <CacheNodeStatus>{}</CacheNodeStatus>\
3424         <CacheNodeCreateTime>{}</CacheNodeCreateTime>\
3425         <Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>\
3426         <ParameterGroupStatus>in-sync</ParameterGroupStatus>\
3427         <CustomerAvailabilityZone>{}</CustomerAvailabilityZone>\
3428         </CacheNode>",
3429        xml_escape(&cluster.cache_cluster_status),
3430        xml_escape(&cluster.created_at),
3431        xml_escape(&cluster.endpoint_address),
3432        cluster.endpoint_port,
3433        xml_escape(&cluster.preferred_availability_zone),
3434    )
3435}
3436
3437fn replication_group_xml(g: &ReplicationGroup, region: &str) -> String {
3438    let member_clusters_xml: String = g
3439        .member_clusters
3440        .iter()
3441        .map(|c| format!("<ClusterId>{}</ClusterId>", xml_escape(c)))
3442        .collect();
3443    let global_replication_group_info_xml = g
3444        .global_replication_group_id
3445        .as_ref()
3446        .map(|global_replication_group_id| {
3447            format!(
3448                "<GlobalReplicationGroupInfo>\
3449                 <GlobalReplicationGroupId>{}</GlobalReplicationGroupId>\
3450                 <GlobalReplicationGroupMemberRole>{}</GlobalReplicationGroupMemberRole>\
3451                 </GlobalReplicationGroupInfo>",
3452                xml_escape(global_replication_group_id),
3453                xml_escape(
3454                    g.global_replication_group_role
3455                        .as_deref()
3456                        .unwrap_or("primary")
3457                ),
3458            )
3459        })
3460        .unwrap_or_default();
3461
3462    let primary_az = format!("{region}a");
3463
3464    format!(
3465        "<ReplicationGroupId>{}</ReplicationGroupId>\
3466         <Description>{}</Description>\
3467         {global_replication_group_info_xml}\
3468         <Status>{}</Status>\
3469         <MemberClusters>{member_clusters_xml}</MemberClusters>\
3470         <NodeGroups>\
3471         <NodeGroup>\
3472         <NodeGroupId>0001</NodeGroupId>\
3473         <Status>available</Status>\
3474         <PrimaryEndpoint>\
3475         <Address>{}</Address>\
3476         <Port>{}</Port>\
3477         </PrimaryEndpoint>\
3478         <ReaderEndpoint>\
3479         <Address>{}</Address>\
3480         <Port>{}</Port>\
3481         </ReaderEndpoint>\
3482         <NodeGroupMembers>\
3483         <NodeGroupMember>\
3484         <CacheClusterId>{}</CacheClusterId>\
3485         <CacheNodeId>0001</CacheNodeId>\
3486         <PreferredAvailabilityZone>{}</PreferredAvailabilityZone>\
3487         <CurrentRole>primary</CurrentRole>\
3488         </NodeGroupMember>\
3489         </NodeGroupMembers>\
3490         </NodeGroup>\
3491         </NodeGroups>\
3492         <AutomaticFailover>{}</AutomaticFailover>\
3493         <SnapshotRetentionLimit>{}</SnapshotRetentionLimit>\
3494         <SnapshotWindow>{}</SnapshotWindow>\
3495         <ClusterEnabled>false</ClusterEnabled>\
3496         <CacheNodeType>{}</CacheNodeType>\
3497         <TransitEncryptionEnabled>false</TransitEncryptionEnabled>\
3498         <AtRestEncryptionEnabled>false</AtRestEncryptionEnabled>\
3499         <ARN>{}</ARN>",
3500        xml_escape(&g.replication_group_id),
3501        xml_escape(&g.description),
3502        xml_escape(&g.status),
3503        xml_escape(&g.endpoint_address),
3504        g.endpoint_port,
3505        xml_escape(&g.endpoint_address),
3506        g.endpoint_port,
3507        xml_escape(g.member_clusters.first().map(|s| s.as_str()).unwrap_or("")),
3508        xml_escape(&primary_az),
3509        if g.automatic_failover_enabled {
3510            "enabled"
3511        } else {
3512            "disabled"
3513        },
3514        g.snapshot_retention_limit,
3515        xml_escape(&g.snapshot_window),
3516        xml_escape(&g.cache_node_type),
3517        xml_escape(&g.arn),
3518    )
3519}
3520
3521fn global_replication_group_id(region: &str, suffix: &str) -> String {
3522    format!("fc-{}-{}", region, suffix)
3523}
3524
3525fn primary_global_member(group: &GlobalReplicationGroup) -> Option<&GlobalReplicationGroupMember> {
3526    group.members.iter().find(|member| member.role == "primary")
3527}
3528
3529fn global_replication_group_xml(group: &GlobalReplicationGroup, show_member_info: bool) -> String {
3530    let members_xml = if show_member_info {
3531        let members_xml: String = group
3532            .members
3533            .iter()
3534            .map(global_replication_group_member_xml)
3535            .collect();
3536        format!("<Members>{members_xml}</Members>")
3537    } else {
3538        String::new()
3539    };
3540    let global_node_groups_xml = if group.cluster_enabled {
3541        "<GlobalNodeGroups><GlobalNodeGroup><GlobalNodeGroupId>0001</GlobalNodeGroupId><Slots>0-16383</Slots></GlobalNodeGroup></GlobalNodeGroups>".to_string()
3542    } else {
3543        String::new()
3544    };
3545
3546    format!(
3547        "<GlobalReplicationGroupId>{}</GlobalReplicationGroupId>\
3548         <GlobalReplicationGroupDescription>{}</GlobalReplicationGroupDescription>\
3549         <Status>{}</Status>\
3550         <CacheNodeType>{}</CacheNodeType>\
3551         <Engine>{}</Engine>\
3552         <EngineVersion>{}</EngineVersion>\
3553         {members_xml}\
3554         <ClusterEnabled>{}</ClusterEnabled>\
3555         {global_node_groups_xml}\
3556         <AuthTokenEnabled>false</AuthTokenEnabled>\
3557         <TransitEncryptionEnabled>false</TransitEncryptionEnabled>\
3558         <AtRestEncryptionEnabled>false</AtRestEncryptionEnabled>\
3559         <ARN>{}</ARN>",
3560        xml_escape(&group.global_replication_group_id),
3561        xml_escape(&group.global_replication_group_description),
3562        xml_escape(&group.status),
3563        xml_escape(&group.cache_node_type),
3564        xml_escape(&group.engine),
3565        xml_escape(&group.engine_version),
3566        group.cluster_enabled,
3567        xml_escape(&group.arn),
3568    )
3569}
3570
3571fn global_replication_group_member_xml(member: &GlobalReplicationGroupMember) -> String {
3572    format!(
3573        "<GlobalReplicationGroupMember>\
3574         <ReplicationGroupId>{}</ReplicationGroupId>\
3575         <ReplicationGroupRegion>{}</ReplicationGroupRegion>\
3576         <Role>{}</Role>\
3577         <AutomaticFailover>{}</AutomaticFailover>\
3578         <Status>{}</Status>\
3579         </GlobalReplicationGroupMember>",
3580        xml_escape(&member.replication_group_id),
3581        xml_escape(&member.replication_group_region),
3582        xml_escape(&member.role),
3583        if member.automatic_failover {
3584            "enabled"
3585        } else {
3586            "disabled"
3587        },
3588        xml_escape(&member.status),
3589    )
3590}
3591
3592fn user_xml(u: &ElastiCacheUser) -> String {
3593    let user_group_ids_xml: String = u
3594        .user_group_ids
3595        .iter()
3596        .map(|id| format!("<member>{}</member>", xml_escape(id)))
3597        .collect();
3598    format!(
3599        "<UserId>{}</UserId>\
3600         <UserName>{}</UserName>\
3601         <Status>{}</Status>\
3602         <Engine>{}</Engine>\
3603         <MinimumEngineVersion>{}</MinimumEngineVersion>\
3604         <AccessString>{}</AccessString>\
3605         <UserGroupIds>{user_group_ids_xml}</UserGroupIds>\
3606         <Authentication>\
3607         <Type>{}</Type>\
3608         <PasswordCount>{}</PasswordCount>\
3609         </Authentication>\
3610         <ARN>{}</ARN>",
3611        xml_escape(&u.user_id),
3612        xml_escape(&u.user_name),
3613        xml_escape(&u.status),
3614        xml_escape(&u.engine),
3615        xml_escape(&u.minimum_engine_version),
3616        xml_escape(&u.access_string),
3617        xml_escape(&u.authentication_type),
3618        u.password_count,
3619        xml_escape(&u.arn),
3620    )
3621}
3622
3623fn user_group_xml(g: &ElastiCacheUserGroup) -> String {
3624    let user_ids_xml: String = g
3625        .user_ids
3626        .iter()
3627        .map(|id| format!("<member>{}</member>", xml_escape(id)))
3628        .collect();
3629    let replication_groups_xml: String = g
3630        .replication_groups
3631        .iter()
3632        .map(|id| format!("<member>{}</member>", xml_escape(id)))
3633        .collect();
3634    let pending_xml = if let Some(ref pc) = g.pending_changes {
3635        let to_add: String = pc
3636            .user_ids_to_add
3637            .iter()
3638            .map(|id| format!("<member>{}</member>", xml_escape(id)))
3639            .collect();
3640        let to_remove: String = pc
3641            .user_ids_to_remove
3642            .iter()
3643            .map(|id| format!("<member>{}</member>", xml_escape(id)))
3644            .collect();
3645        format!(
3646            "<PendingChanges>\
3647             <UserIdsToAdd>{to_add}</UserIdsToAdd>\
3648             <UserIdsToRemove>{to_remove}</UserIdsToRemove>\
3649             </PendingChanges>"
3650        )
3651    } else {
3652        String::new()
3653    };
3654    format!(
3655        "<UserGroupId>{}</UserGroupId>\
3656         <Status>{}</Status>\
3657         <Engine>{}</Engine>\
3658         <MinimumEngineVersion>{}</MinimumEngineVersion>\
3659         <UserIds>{user_ids_xml}</UserIds>\
3660         <ReplicationGroups>{replication_groups_xml}</ReplicationGroups>\
3661         {pending_xml}\
3662         <ARN>{}</ARN>",
3663        xml_escape(&g.user_group_id),
3664        xml_escape(&g.status),
3665        xml_escape(&g.engine),
3666        xml_escape(&g.minimum_engine_version),
3667        xml_escape(&g.arn),
3668    )
3669}
3670
3671fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3672    match error {
3673        RuntimeError::Unavailable => AwsServiceError::aws_error(
3674            StatusCode::SERVICE_UNAVAILABLE,
3675            "InvalidParameterValue",
3676            "Docker/Podman is required for ElastiCache replication groups but is not available"
3677                .to_string(),
3678        ),
3679        RuntimeError::ContainerStartFailed(msg) => AwsServiceError::aws_error(
3680            StatusCode::INTERNAL_SERVER_ERROR,
3681            "InvalidParameterValue",
3682            format!("Failed to start Redis container: {msg}"),
3683        ),
3684    }
3685}
3686
3687fn add_cluster_to_replication_group(
3688    state: &mut ElastiCacheState,
3689    replication_group_id: &str,
3690    cache_cluster_id: &str,
3691) {
3692    if let Some(group) = state.replication_groups.get_mut(replication_group_id) {
3693        if !group
3694            .member_clusters
3695            .iter()
3696            .any(|id| id == cache_cluster_id)
3697        {
3698            group.member_clusters.push(cache_cluster_id.to_string());
3699            group.num_cache_clusters = group.member_clusters.len() as i32;
3700        }
3701    }
3702}
3703
3704fn remove_cluster_from_replication_group(
3705    state: &mut ElastiCacheState,
3706    replication_group_id: &str,
3707    cache_cluster_id: &str,
3708) {
3709    if let Some(group) = state.replication_groups.get_mut(replication_group_id) {
3710        let original_len = group.member_clusters.len();
3711        group.member_clusters.retain(|id| id != cache_cluster_id);
3712        if group.member_clusters.len() != original_len {
3713            group.num_cache_clusters = group.member_clusters.len() as i32;
3714        }
3715    }
3716}
3717
3718fn snapshot_xml(s: &CacheSnapshot) -> String {
3719    format!(
3720        "<SnapshotName>{}</SnapshotName>\
3721         <ReplicationGroupId>{}</ReplicationGroupId>\
3722         <ReplicationGroupDescription>{}</ReplicationGroupDescription>\
3723         <SnapshotStatus>{}</SnapshotStatus>\
3724         <SnapshotSource>{}</SnapshotSource>\
3725         <CacheNodeType>{}</CacheNodeType>\
3726         <Engine>{}</Engine>\
3727         <EngineVersion>{}</EngineVersion>\
3728         <NumCacheClusters>{}</NumCacheClusters>\
3729         <ARN>{}</ARN>",
3730        xml_escape(&s.snapshot_name),
3731        xml_escape(&s.replication_group_id),
3732        xml_escape(&s.replication_group_description),
3733        xml_escape(&s.snapshot_status),
3734        xml_escape(&s.snapshot_source),
3735        xml_escape(&s.cache_node_type),
3736        xml_escape(&s.engine),
3737        xml_escape(&s.engine_version),
3738        s.num_cache_clusters,
3739        xml_escape(&s.arn),
3740    )
3741}
3742
3743fn serverless_cache_xml(cache: &ServerlessCache) -> String {
3744    let cache_usage_limits_xml = cache
3745        .cache_usage_limits
3746        .as_ref()
3747        .map(serverless_cache_usage_limits_xml)
3748        .unwrap_or_default();
3749    let kms_key_id_xml = cache
3750        .kms_key_id
3751        .as_ref()
3752        .map(|value| format!("<KmsKeyId>{}</KmsKeyId>", xml_escape(value)))
3753        .unwrap_or_default();
3754    let security_group_ids_xml = if cache.security_group_ids.is_empty() {
3755        String::new()
3756    } else {
3757        let members: String = cache
3758            .security_group_ids
3759            .iter()
3760            .map(|id| format!("<SecurityGroupId>{}</SecurityGroupId>", xml_escape(id)))
3761            .collect();
3762        format!("<SecurityGroupIds>{members}</SecurityGroupIds>")
3763    };
3764    let subnet_ids_xml = if cache.subnet_ids.is_empty() {
3765        String::new()
3766    } else {
3767        let members: String = cache
3768            .subnet_ids
3769            .iter()
3770            .map(|id| format!("<member>{}</member>", xml_escape(id)))
3771            .collect();
3772        format!("<SubnetIds>{members}</SubnetIds>")
3773    };
3774    let user_group_id_xml = cache
3775        .user_group_id
3776        .as_ref()
3777        .map(|value| format!("<UserGroupId>{}</UserGroupId>", xml_escape(value)))
3778        .unwrap_or_default();
3779    let snapshot_retention_limit_xml = cache
3780        .snapshot_retention_limit
3781        .map(|value| format!("<SnapshotRetentionLimit>{value}</SnapshotRetentionLimit>"))
3782        .unwrap_or_default();
3783    let daily_snapshot_time_xml = cache
3784        .daily_snapshot_time
3785        .as_ref()
3786        .map(|value| {
3787            format!(
3788                "<DailySnapshotTime>{}</DailySnapshotTime>",
3789                xml_escape(value)
3790            )
3791        })
3792        .unwrap_or_default();
3793
3794    format!(
3795        "<ServerlessCacheName>{}</ServerlessCacheName>\
3796         <Description>{}</Description>\
3797         <CreateTime>{}</CreateTime>\
3798         <Status>{}</Status>\
3799         <Engine>{}</Engine>\
3800         <MajorEngineVersion>{}</MajorEngineVersion>\
3801         <FullEngineVersion>{}</FullEngineVersion>\
3802         {cache_usage_limits_xml}\
3803         {kms_key_id_xml}\
3804         {security_group_ids_xml}\
3805         <Endpoint>{}</Endpoint>\
3806         <ReaderEndpoint>{}</ReaderEndpoint>\
3807         <ARN>{}</ARN>\
3808         {user_group_id_xml}\
3809         {subnet_ids_xml}\
3810         {snapshot_retention_limit_xml}\
3811         {daily_snapshot_time_xml}",
3812        xml_escape(&cache.serverless_cache_name),
3813        xml_escape(&cache.description),
3814        xml_escape(&cache.created_at),
3815        xml_escape(&cache.status),
3816        xml_escape(&cache.engine),
3817        xml_escape(&cache.major_engine_version),
3818        xml_escape(&cache.full_engine_version),
3819        serverless_cache_endpoint_xml(&cache.endpoint),
3820        serverless_cache_endpoint_xml(&cache.reader_endpoint),
3821        xml_escape(&cache.arn),
3822    )
3823}
3824
3825fn serverless_cache_usage_limits_xml(limits: &ServerlessCacheUsageLimits) -> String {
3826    let data_storage_xml = limits
3827        .data_storage
3828        .as_ref()
3829        .map(|data_storage| {
3830            let maximum_xml = data_storage
3831                .maximum
3832                .map(|value| format!("<Maximum>{value}</Maximum>"))
3833                .unwrap_or_default();
3834            let minimum_xml = data_storage
3835                .minimum
3836                .map(|value| format!("<Minimum>{value}</Minimum>"))
3837                .unwrap_or_default();
3838            let unit_xml = data_storage
3839                .unit
3840                .as_ref()
3841                .map(|value| format!("<Unit>{}</Unit>", xml_escape(value)))
3842                .unwrap_or_default();
3843            format!("<DataStorage>{maximum_xml}{minimum_xml}{unit_xml}</DataStorage>")
3844        })
3845        .unwrap_or_default();
3846    let ecpu_per_second_xml = limits
3847        .ecpu_per_second
3848        .as_ref()
3849        .map(|ecpu| {
3850            let maximum_xml = ecpu
3851                .maximum
3852                .map(|value| format!("<Maximum>{value}</Maximum>"))
3853                .unwrap_or_default();
3854            let minimum_xml = ecpu
3855                .minimum
3856                .map(|value| format!("<Minimum>{value}</Minimum>"))
3857                .unwrap_or_default();
3858            format!("<ECPUPerSecond>{maximum_xml}{minimum_xml}</ECPUPerSecond>")
3859        })
3860        .unwrap_or_default();
3861
3862    format!("<CacheUsageLimits>{data_storage_xml}{ecpu_per_second_xml}</CacheUsageLimits>")
3863}
3864
3865fn serverless_cache_endpoint_xml(endpoint: &ServerlessCacheEndpoint) -> String {
3866    format!(
3867        "<Address>{}</Address><Port>{}</Port>",
3868        xml_escape(&endpoint.address),
3869        endpoint.port,
3870    )
3871}
3872
3873fn serverless_cache_snapshot_xml(snapshot: &ServerlessCacheSnapshot) -> String {
3874    let kms_key_id_xml = snapshot
3875        .kms_key_id
3876        .as_ref()
3877        .map(|value| format!("<KmsKeyId>{}</KmsKeyId>", xml_escape(value)))
3878        .unwrap_or_default();
3879    let expiry_time_xml = snapshot
3880        .expiry_time
3881        .as_ref()
3882        .map(|value| format!("<ExpiryTime>{}</ExpiryTime>", xml_escape(value)))
3883        .unwrap_or_default();
3884    let bytes_used_for_cache_xml = snapshot
3885        .bytes_used_for_cache
3886        .as_ref()
3887        .map(|value| {
3888            format!(
3889                "<BytesUsedForCache>{}</BytesUsedForCache>",
3890                xml_escape(value)
3891            )
3892        })
3893        .unwrap_or_default();
3894
3895    format!(
3896        "<ServerlessCacheSnapshotName>{}</ServerlessCacheSnapshotName>\
3897         <ARN>{}</ARN>\
3898         {kms_key_id_xml}\
3899         <SnapshotType>{}</SnapshotType>\
3900         <Status>{}</Status>\
3901         <CreateTime>{}</CreateTime>\
3902         {expiry_time_xml}\
3903         {bytes_used_for_cache_xml}\
3904         <ServerlessCacheConfiguration>\
3905         <ServerlessCacheName>{}</ServerlessCacheName>\
3906         <Engine>{}</Engine>\
3907         <MajorEngineVersion>{}</MajorEngineVersion>\
3908         </ServerlessCacheConfiguration>",
3909        xml_escape(&snapshot.serverless_cache_snapshot_name),
3910        xml_escape(&snapshot.arn),
3911        xml_escape(&snapshot.snapshot_type),
3912        xml_escape(&snapshot.status),
3913        xml_escape(&snapshot.create_time),
3914        xml_escape(&snapshot.serverless_cache_name),
3915        xml_escape(&snapshot.engine),
3916        xml_escape(&snapshot.major_engine_version),
3917    )
3918}
3919
3920fn parameter_xml(p: &EngineDefaultParameter) -> String {
3921    format!(
3922        "<Parameter>\
3923         <ParameterName>{}</ParameterName>\
3924         <ParameterValue>{}</ParameterValue>\
3925         <Description>{}</Description>\
3926         <Source>{}</Source>\
3927         <DataType>{}</DataType>\
3928         <AllowedValues>{}</AllowedValues>\
3929         <IsModifiable>{}</IsModifiable>\
3930         <MinimumEngineVersion>{}</MinimumEngineVersion>\
3931         </Parameter>",
3932        xml_escape(&p.parameter_name),
3933        xml_escape(&p.parameter_value),
3934        xml_escape(&p.description),
3935        xml_escape(&p.source),
3936        xml_escape(&p.data_type),
3937        xml_escape(&p.allowed_values),
3938        p.is_modifiable,
3939        xml_escape(&p.minimum_engine_version),
3940    )
3941}
3942
3943#[cfg(test)]
3944mod tests {
3945    use super::*;
3946    use crate::state::default_engine_versions;
3947    use bytes::Bytes;
3948    use http::{HeaderMap, Method};
3949    use std::collections::HashMap;
3950
3951    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3952        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3953        for (key, value) in params {
3954            query_params.insert((*key).to_string(), (*value).to_string());
3955        }
3956
3957        AwsRequest {
3958            service: "elasticache".to_string(),
3959            action: action.to_string(),
3960            region: "us-east-1".to_string(),
3961            account_id: "123456789012".to_string(),
3962            request_id: "test-request-id".to_string(),
3963            headers: HeaderMap::new(),
3964            query_params,
3965            body: Bytes::new(),
3966            path_segments: vec![],
3967            raw_path: "/".to_string(),
3968            raw_query: String::new(),
3969            method: http::Method::POST,
3970            is_query_protocol: true,
3971            access_key_id: None,
3972            principal: None,
3973        }
3974    }
3975
3976    fn sample_reserved_cache_node_offering(id: &str) -> ReservedCacheNodesOffering {
3977        ReservedCacheNodesOffering {
3978            reserved_cache_nodes_offering_id: id.to_string(),
3979            cache_node_type: "cache.t3.micro".to_string(),
3980            duration: 31_536_000,
3981            fixed_price: 0.0,
3982            usage_price: 0.011,
3983            product_description: "redis".to_string(),
3984            offering_type: "No Upfront".to_string(),
3985            recurring_charges: Vec::new(),
3986        }
3987    }
3988
3989    fn sample_reserved_cache_node(id: &str, offering_id: &str) -> ReservedCacheNode {
3990        ReservedCacheNode {
3991            reserved_cache_node_id: id.to_string(),
3992            reserved_cache_nodes_offering_id: offering_id.to_string(),
3993            cache_node_type: "cache.t3.micro".to_string(),
3994            start_time: "2024-01-01T00:00:00Z".to_string(),
3995            duration: 31_536_000,
3996            fixed_price: 0.0,
3997            usage_price: 0.011,
3998            cache_node_count: 1,
3999            product_description: "redis".to_string(),
4000            offering_type: "No Upfront".to_string(),
4001            state: "payment-pending".to_string(),
4002            recurring_charges: Vec::new(),
4003            reservation_arn: "arn:aws:elasticache:us-east-1:123456789012:reserved-instance:test"
4004                .to_string(),
4005        }
4006    }
4007
4008    #[test]
4009    fn parse_member_list_extracts_indexed_values() {
4010        let mut params = HashMap::new();
4011        params.insert(
4012            "SubnetIds.SubnetIdentifier.1".to_string(),
4013            "subnet-aaa".to_string(),
4014        );
4015        params.insert(
4016            "SubnetIds.SubnetIdentifier.2".to_string(),
4017            "subnet-bbb".to_string(),
4018        );
4019        params.insert(
4020            "SubnetIds.SubnetIdentifier.3".to_string(),
4021            "subnet-ccc".to_string(),
4022        );
4023        params.insert("OtherParam".to_string(), "ignored".to_string());
4024
4025        let result = parse_member_list(&params, "SubnetIds", "SubnetIdentifier");
4026        assert_eq!(result, vec!["subnet-aaa", "subnet-bbb", "subnet-ccc"]);
4027    }
4028
4029    #[test]
4030    fn parse_member_list_returns_sorted_by_index() {
4031        let mut params = HashMap::new();
4032        params.insert(
4033            "SubnetIds.SubnetIdentifier.3".to_string(),
4034            "subnet-ccc".to_string(),
4035        );
4036        params.insert(
4037            "SubnetIds.SubnetIdentifier.1".to_string(),
4038            "subnet-aaa".to_string(),
4039        );
4040
4041        let result = parse_member_list(&params, "SubnetIds", "SubnetIdentifier");
4042        assert_eq!(result, vec!["subnet-aaa", "subnet-ccc"]);
4043    }
4044
4045    #[test]
4046    fn parse_member_list_returns_empty_for_no_matches() {
4047        let params = HashMap::new();
4048        let result = parse_member_list(&params, "SubnetIds", "SubnetIdentifier");
4049        assert!(result.is_empty());
4050    }
4051
4052    #[test]
4053    fn cache_subnet_group_xml_contains_all_fields() {
4054        let group = CacheSubnetGroup {
4055            cache_subnet_group_name: "my-group".to_string(),
4056            cache_subnet_group_description: "My description".to_string(),
4057            vpc_id: "vpc-123".to_string(),
4058            subnet_ids: vec!["subnet-aaa".to_string(), "subnet-bbb".to_string()],
4059            arn: "arn:aws:elasticache:us-east-1:123:subnetgroup:my-group".to_string(),
4060        };
4061        let xml = cache_subnet_group_xml(&group, "us-east-1");
4062        assert!(xml.contains("<CacheSubnetGroupName>my-group</CacheSubnetGroupName>"));
4063        assert!(xml
4064            .contains("<CacheSubnetGroupDescription>My description</CacheSubnetGroupDescription>"));
4065        assert!(xml.contains("<VpcId>vpc-123</VpcId>"));
4066        assert!(xml.contains("<SubnetIdentifier>subnet-aaa</SubnetIdentifier>"));
4067        assert!(xml.contains("<SubnetIdentifier>subnet-bbb</SubnetIdentifier>"));
4068        assert!(xml.contains("<Name>us-east-1a</Name>"));
4069        assert!(xml.contains("<Name>us-east-1b</Name>"));
4070        assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:subnetgroup:my-group</ARN>"));
4071    }
4072
4073    #[test]
4074    fn cache_cluster_xml_contains_expected_fields() {
4075        let cluster = CacheCluster {
4076            cache_cluster_id: "classic-1".to_string(),
4077            cache_node_type: "cache.t3.micro".to_string(),
4078            engine: "redis".to_string(),
4079            engine_version: "7.1".to_string(),
4080            cache_cluster_status: "available".to_string(),
4081            num_cache_nodes: 2,
4082            preferred_availability_zone: "us-east-1a".to_string(),
4083            cache_subnet_group_name: Some("default".to_string()),
4084            auto_minor_version_upgrade: true,
4085            arn: "arn:aws:elasticache:us-east-1:123:cluster:classic-1".to_string(),
4086            created_at: "2024-01-01T00:00:00Z".to_string(),
4087            endpoint_address: "127.0.0.1".to_string(),
4088            endpoint_port: 6379,
4089            container_id: "abc123".to_string(),
4090            host_port: 6379,
4091            replication_group_id: Some("rg-1".to_string()),
4092        };
4093
4094        let xml = cache_cluster_xml(&cluster, true);
4095        assert!(xml.contains("<CacheClusterId>classic-1</CacheClusterId>"));
4096        assert!(xml.contains("<CacheNodeType>cache.t3.micro</CacheNodeType>"));
4097        assert!(xml.contains("<Engine>redis</Engine>"));
4098        assert!(xml.contains("<NumCacheNodes>2</NumCacheNodes>"));
4099        assert!(xml.contains("<PreferredAvailabilityZone>us-east-1a</PreferredAvailabilityZone>"));
4100        assert!(xml.contains("<CacheSubnetGroupName>default</CacheSubnetGroupName>"));
4101        assert!(xml.contains("<CacheNodes>"));
4102        assert!(xml.contains("<CacheNodeId>0001</CacheNodeId>"));
4103        assert!(xml.contains("<ReplicationGroupId>rg-1</ReplicationGroupId>"));
4104        assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:cluster:classic-1</ARN>"));
4105    }
4106
4107    #[test]
4108    fn filter_engine_versions_by_engine() {
4109        let versions = default_engine_versions();
4110        let filtered = filter_engine_versions(&versions, &Some("redis".to_string()), &None, &None);
4111        assert_eq!(filtered.len(), 1);
4112        assert_eq!(filtered[0].engine, "redis");
4113    }
4114
4115    #[test]
4116    fn filter_engine_versions_by_family() {
4117        let versions = default_engine_versions();
4118        let filtered =
4119            filter_engine_versions(&versions, &None, &None, &Some("valkey8".to_string()));
4120        assert_eq!(filtered.len(), 1);
4121        assert_eq!(filtered[0].engine, "valkey");
4122    }
4123
4124    #[test]
4125    fn filter_engine_versions_no_match() {
4126        let versions = default_engine_versions();
4127        let filtered =
4128            filter_engine_versions(&versions, &Some("memcached".to_string()), &None, &None);
4129        assert!(filtered.is_empty());
4130    }
4131
4132    #[test]
4133    fn paginate_returns_all_when_within_limit() {
4134        let items = vec![1, 2, 3];
4135        let (page, marker) = paginate(&items, None, None);
4136        assert_eq!(page, vec![1, 2, 3]);
4137        assert!(marker.is_none());
4138    }
4139
4140    #[test]
4141    fn paginate_respects_max_records() {
4142        let items = vec![1, 2, 3, 4, 5];
4143        let (page, marker) = paginate(&items, None, Some(2));
4144        assert_eq!(page, vec![1, 2]);
4145        assert_eq!(marker, Some("2".to_string()));
4146
4147        let (page2, marker2) = paginate(&items, Some("2"), Some(2));
4148        assert_eq!(page2, vec![3, 4]);
4149        assert_eq!(marker2, Some("4".to_string()));
4150
4151        let (page3, marker3) = paginate(&items, Some("4"), Some(2));
4152        assert_eq!(page3, vec![5]);
4153        assert!(marker3.is_none());
4154    }
4155
4156    #[test]
4157    fn parse_reserved_duration_filter_accepts_years_and_seconds() {
4158        assert_eq!(
4159            parse_reserved_duration_filter(Some("1".to_string())).unwrap(),
4160            Some(31_536_000)
4161        );
4162        assert_eq!(
4163            parse_reserved_duration_filter(Some("94608000".to_string())).unwrap(),
4164            Some(94_608_000)
4165        );
4166    }
4167
4168    #[test]
4169    fn parse_reserved_duration_filter_rejects_invalid_value() {
4170        assert!(parse_reserved_duration_filter(Some("2".to_string())).is_err());
4171    }
4172
4173    #[test]
4174    fn xml_wrap_produces_valid_response() {
4175        let xml = xml_wrap("TestAction", "<Data>ok</Data>", "req-123");
4176        assert!(xml.contains("<TestActionResponse"));
4177        assert!(xml.contains("<TestActionResult>"));
4178        assert!(xml.contains("<RequestId>req-123</RequestId>"));
4179        assert!(xml.contains(ELASTICACHE_NS));
4180    }
4181
4182    #[test]
4183    fn parse_tags_reads_query_shape() {
4184        let req = request(
4185            "AddTagsToResource",
4186            &[
4187                ("Tags.Tag.1.Key", "env"),
4188                ("Tags.Tag.1.Value", "prod"),
4189                ("Tags.Tag.2.Key", "team"),
4190                ("Tags.Tag.2.Value", "backend"),
4191            ],
4192        );
4193
4194        let tags = parse_tags(&req).expect("tags");
4195        assert_eq!(
4196            tags,
4197            vec![
4198                ("env".to_string(), "prod".to_string()),
4199                ("team".to_string(), "backend".to_string()),
4200            ]
4201        );
4202    }
4203
4204    #[test]
4205    fn parse_tags_returns_empty_for_no_tags() {
4206        let req = request("AddTagsToResource", &[]);
4207        let tags = parse_tags(&req).expect("tags");
4208        assert!(tags.is_empty());
4209    }
4210
4211    #[test]
4212    fn parse_tag_keys_reads_member_shape() {
4213        let req = request(
4214            "RemoveTagsFromResource",
4215            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
4216        );
4217
4218        let keys = parse_tag_keys(&req).expect("tag keys");
4219        assert_eq!(keys, vec!["env".to_string(), "team".to_string()]);
4220    }
4221
4222    #[test]
4223    fn merge_tags_adds_new_and_updates_existing() {
4224        let mut tags = vec![("env".to_string(), "dev".to_string())];
4225
4226        merge_tags(
4227            &mut tags,
4228            &[
4229                ("env".to_string(), "prod".to_string()),
4230                ("team".to_string(), "core".to_string()),
4231            ],
4232        );
4233
4234        assert_eq!(
4235            tags,
4236            vec![
4237                ("env".to_string(), "prod".to_string()),
4238                ("team".to_string(), "core".to_string()),
4239            ]
4240        );
4241    }
4242
4243    #[test]
4244    fn tag_xml_produces_valid_element() {
4245        let xml = tag_xml(&("env".to_string(), "prod".to_string()));
4246        assert_eq!(xml, "<Tag><Key>env</Key><Value>prod</Value></Tag>");
4247    }
4248
4249    #[test]
4250    fn reserved_cache_nodes_offering_xml_contains_expected_fields() {
4251        let xml = reserved_cache_nodes_offering_xml(&ReservedCacheNodesOffering {
4252            reserved_cache_nodes_offering_id: "offering-a".to_string(),
4253            cache_node_type: "cache.r6g.large".to_string(),
4254            duration: 94_608_000,
4255            fixed_price: 1550.0,
4256            usage_price: 0.0,
4257            product_description: "redis".to_string(),
4258            offering_type: "All Upfront".to_string(),
4259            recurring_charges: vec![RecurringCharge {
4260                recurring_charge_amount: 0.0,
4261                recurring_charge_frequency: "Hourly".to_string(),
4262            }],
4263        });
4264        assert!(
4265            xml.contains("<ReservedCacheNodesOfferingId>offering-a</ReservedCacheNodesOfferingId>")
4266        );
4267        assert!(xml.contains("<CacheNodeType>cache.r6g.large</CacheNodeType>"));
4268        assert!(xml.contains("<Duration>94608000</Duration>"));
4269        assert!(xml.contains("<OfferingType>All Upfront</OfferingType>"));
4270        assert!(xml.contains("<RecurringChargeFrequency>Hourly</RecurringChargeFrequency>"));
4271    }
4272
4273    #[test]
4274    fn reserved_cache_node_xml_contains_expected_fields() {
4275        let xml = reserved_cache_node_xml(&sample_reserved_cache_node("rcn-a", "offering-a"));
4276        assert!(xml.contains("<ReservedCacheNodeId>rcn-a</ReservedCacheNodeId>"));
4277        assert!(
4278            xml.contains("<ReservedCacheNodesOfferingId>offering-a</ReservedCacheNodesOfferingId>")
4279        );
4280        assert!(xml.contains("<StartTime>2024-01-01T00:00:00Z</StartTime>"));
4281        assert!(xml.contains("<State>payment-pending</State>"));
4282        assert!(xml.contains("<ReservationARN>"));
4283    }
4284
4285    #[test]
4286    fn user_xml_contains_all_fields() {
4287        let user = ElastiCacheUser {
4288            user_id: "myuser".to_string(),
4289            user_name: "myuser".to_string(),
4290            engine: "redis".to_string(),
4291            access_string: "on ~* +@all".to_string(),
4292            status: "active".to_string(),
4293            authentication_type: "password".to_string(),
4294            password_count: 1,
4295            arn: "arn:aws:elasticache:us-east-1:123:user:myuser".to_string(),
4296            minimum_engine_version: "6.0".to_string(),
4297            user_group_ids: vec!["group1".to_string()],
4298        };
4299        let xml = user_xml(&user);
4300        assert!(xml.contains("<UserId>myuser</UserId>"));
4301        assert!(xml.contains("<UserName>myuser</UserName>"));
4302        assert!(xml.contains("<Engine>redis</Engine>"));
4303        assert!(xml.contains("<AccessString>on ~* +@all</AccessString>"));
4304        assert!(xml.contains("<Status>active</Status>"));
4305        assert!(xml.contains("<Type>password</Type>"));
4306        assert!(xml.contains("<PasswordCount>1</PasswordCount>"));
4307        assert!(xml.contains("<member>group1</member>"));
4308        assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:user:myuser</ARN>"));
4309    }
4310
4311    #[test]
4312    fn user_group_xml_contains_all_fields() {
4313        let group = ElastiCacheUserGroup {
4314            user_group_id: "mygroup".to_string(),
4315            engine: "redis".to_string(),
4316            status: "active".to_string(),
4317            user_ids: vec!["default".to_string(), "myuser".to_string()],
4318            arn: "arn:aws:elasticache:us-east-1:123:usergroup:mygroup".to_string(),
4319            minimum_engine_version: "6.0".to_string(),
4320            pending_changes: None,
4321            replication_groups: Vec::new(),
4322        };
4323        let xml = user_group_xml(&group);
4324        assert!(xml.contains("<UserGroupId>mygroup</UserGroupId>"));
4325        assert!(xml.contains("<Engine>redis</Engine>"));
4326        assert!(xml.contains("<Status>active</Status>"));
4327        assert!(xml.contains("<member>default</member>"));
4328        assert!(xml.contains("<member>myuser</member>"));
4329        assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:usergroup:mygroup</ARN>"));
4330    }
4331
4332    #[test]
4333    fn create_user_returns_user_xml() {
4334        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4335        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4336        let service = ElastiCacheService::new(shared);
4337
4338        let req = request(
4339            "CreateUser",
4340            &[
4341                ("UserId", "testuser"),
4342                ("UserName", "testuser"),
4343                ("Engine", "redis"),
4344                ("AccessString", "on ~* +@all"),
4345            ],
4346        );
4347        let resp = service.create_user(&req).unwrap();
4348        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4349        assert!(body.contains("<UserId>testuser</UserId>"));
4350        assert!(body.contains("<Status>active</Status>"));
4351        assert!(body.contains("<CreateUserResponse"));
4352    }
4353
4354    #[test]
4355    fn create_user_rejects_duplicate() {
4356        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4357        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4358        let service = ElastiCacheService::new(shared);
4359
4360        let req = request(
4361            "CreateUser",
4362            &[
4363                ("UserId", "default"),
4364                ("UserName", "default"),
4365                ("Engine", "redis"),
4366                ("AccessString", "on ~* +@all"),
4367            ],
4368        );
4369        assert!(service.create_user(&req).is_err());
4370    }
4371
4372    #[test]
4373    fn delete_user_rejects_default() {
4374        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4375        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4376        let service = ElastiCacheService::new(shared);
4377
4378        let req = request("DeleteUser", &[("UserId", "default")]);
4379        assert!(service.delete_user(&req).is_err());
4380    }
4381
4382    #[test]
4383    fn describe_users_returns_default_user() {
4384        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4385        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4386        let service = ElastiCacheService::new(shared);
4387
4388        let req = request("DescribeUsers", &[]);
4389        let resp = service.describe_users(&req).unwrap();
4390        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4391        assert!(body.contains("<UserId>default</UserId>"));
4392    }
4393
4394    #[test]
4395    fn describe_reserved_cache_nodes_returns_empty_list_by_default() {
4396        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4397        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4398        let service = ElastiCacheService::new(shared);
4399
4400        let resp = service
4401            .describe_reserved_cache_nodes(&request("DescribeReservedCacheNodes", &[]))
4402            .unwrap();
4403        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4404        assert!(body.contains("<ReservedCacheNodes></ReservedCacheNodes>"));
4405    }
4406
4407    #[test]
4408    fn describe_reserved_cache_nodes_filters_by_offering_id() {
4409        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4410        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4411        let service = ElastiCacheService::new(shared);
4412        {
4413            let mut state = service.state.write();
4414            state.reserved_cache_nodes.insert(
4415                "rcn-a".to_string(),
4416                sample_reserved_cache_node("rcn-a", "offering-a"),
4417            );
4418            state.reserved_cache_nodes.insert(
4419                "rcn-b".to_string(),
4420                sample_reserved_cache_node("rcn-b", "offering-b"),
4421            );
4422        }
4423
4424        let resp = service
4425            .describe_reserved_cache_nodes(&request(
4426                "DescribeReservedCacheNodes",
4427                &[("ReservedCacheNodesOfferingId", "offering-b")],
4428            ))
4429            .unwrap();
4430        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4431        assert!(body.contains("<ReservedCacheNodeId>rcn-b</ReservedCacheNodeId>"));
4432        assert!(!body.contains("<ReservedCacheNodeId>rcn-a</ReservedCacheNodeId>"));
4433    }
4434
4435    #[test]
4436    fn describe_reserved_cache_nodes_not_found_by_id() {
4437        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4438        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4439        let service = ElastiCacheService::new(shared);
4440
4441        assert!(service
4442            .describe_reserved_cache_nodes(&request(
4443                "DescribeReservedCacheNodes",
4444                &[("ReservedCacheNodeId", "missing")],
4445            ))
4446            .is_err());
4447    }
4448
4449    #[test]
4450    fn describe_reserved_cache_nodes_offerings_filters_and_paginates() {
4451        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4452        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4453        let service = ElastiCacheService::new(shared);
4454        {
4455            let mut state = service.state.write();
4456            state.reserved_cache_nodes_offerings = vec![
4457                sample_reserved_cache_node_offering("offering-a"),
4458                ReservedCacheNodesOffering {
4459                    reserved_cache_nodes_offering_id: "offering-b".to_string(),
4460                    cache_node_type: "cache.m5.large".to_string(),
4461                    duration: 94_608_000,
4462                    fixed_price: 0.0,
4463                    usage_price: 0.033,
4464                    product_description: "memcached".to_string(),
4465                    offering_type: "No Upfront".to_string(),
4466                    recurring_charges: Vec::new(),
4467                },
4468                ReservedCacheNodesOffering {
4469                    reserved_cache_nodes_offering_id: "offering-c".to_string(),
4470                    cache_node_type: "cache.r6g.large".to_string(),
4471                    duration: 94_608_000,
4472                    fixed_price: 1_550.0,
4473                    usage_price: 0.0,
4474                    product_description: "redis".to_string(),
4475                    offering_type: "All Upfront".to_string(),
4476                    recurring_charges: vec![RecurringCharge {
4477                        recurring_charge_amount: 0.0,
4478                        recurring_charge_frequency: "Hourly".to_string(),
4479                    }],
4480                },
4481            ];
4482        }
4483
4484        let filtered = service
4485            .describe_reserved_cache_nodes_offerings(&request(
4486                "DescribeReservedCacheNodesOfferings",
4487                &[("ProductDescription", "redis"), ("Duration", "3")],
4488            ))
4489            .unwrap();
4490        let filtered_body = String::from_utf8(filtered.body.expect_bytes().to_vec()).unwrap();
4491        assert!(filtered_body
4492            .contains("<ReservedCacheNodesOfferingId>offering-c</ReservedCacheNodesOfferingId>"));
4493        assert!(!filtered_body
4494            .contains("<ReservedCacheNodesOfferingId>offering-b</ReservedCacheNodesOfferingId>"));
4495
4496        let paged = service
4497            .describe_reserved_cache_nodes_offerings(&request(
4498                "DescribeReservedCacheNodesOfferings",
4499                &[("MaxRecords", "1")],
4500            ))
4501            .unwrap();
4502        let paged_body = String::from_utf8(paged.body.expect_bytes().to_vec()).unwrap();
4503        assert!(paged_body.contains("<Marker>1</Marker>"));
4504    }
4505
4506    #[test]
4507    fn describe_reserved_cache_nodes_offerings_not_found_by_id() {
4508        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4509        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4510        let service = ElastiCacheService::new(shared);
4511
4512        assert!(service
4513            .describe_reserved_cache_nodes_offerings(&request(
4514                "DescribeReservedCacheNodesOfferings",
4515                &[("ReservedCacheNodesOfferingId", "missing")],
4516            ))
4517            .is_err());
4518    }
4519
4520    #[test]
4521    fn create_and_describe_user_group() {
4522        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4523        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4524        let service = ElastiCacheService::new(shared);
4525
4526        let req = request(
4527            "CreateUserGroup",
4528            &[
4529                ("UserGroupId", "mygroup"),
4530                ("Engine", "redis"),
4531                ("UserIds.member.1", "default"),
4532            ],
4533        );
4534        let resp = service.create_user_group(&req).unwrap();
4535        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4536        assert!(body.contains("<UserGroupId>mygroup</UserGroupId>"));
4537        assert!(body.contains("<member>default</member>"));
4538
4539        let req = request("DescribeUserGroups", &[]);
4540        let resp = service.describe_user_groups(&req).unwrap();
4541        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4542        assert!(body.contains("<UserGroupId>mygroup</UserGroupId>"));
4543    }
4544
4545    #[test]
4546    fn create_user_group_rejects_unknown_user() {
4547        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4548        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4549        let service = ElastiCacheService::new(shared);
4550
4551        let req = request(
4552            "CreateUserGroup",
4553            &[
4554                ("UserGroupId", "mygroup"),
4555                ("Engine", "redis"),
4556                ("UserIds.member.1", "nonexistent"),
4557            ],
4558        );
4559        assert!(service.create_user_group(&req).is_err());
4560    }
4561
4562    #[test]
4563    fn delete_user_group_removes_from_state() {
4564        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4565        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4566        let service = ElastiCacheService::new(shared);
4567
4568        let req = request(
4569            "CreateUserGroup",
4570            &[("UserGroupId", "delgroup"), ("Engine", "redis")],
4571        );
4572        service.create_user_group(&req).unwrap();
4573
4574        let req = request("DeleteUserGroup", &[("UserGroupId", "delgroup")]);
4575        let resp = service.delete_user_group(&req).unwrap();
4576        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4577        assert!(body.contains("<Status>deleting</Status>"));
4578
4579        let req = request("DescribeUserGroups", &[("UserGroupId", "delgroup")]);
4580        assert!(service.describe_user_groups(&req).is_err());
4581    }
4582
4583    fn service_with_cache_cluster(cluster_id: &str) -> ElastiCacheService {
4584        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4585        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4586        {
4587            let mut s = shared.write();
4588            let arn = format!("arn:aws:elasticache:us-east-1:123456789012:cluster:{cluster_id}");
4589            s.tags.insert(arn.clone(), Vec::new());
4590            s.cache_clusters.insert(
4591                cluster_id.to_string(),
4592                CacheCluster {
4593                    cache_cluster_id: cluster_id.to_string(),
4594                    cache_node_type: "cache.t3.micro".to_string(),
4595                    engine: "redis".to_string(),
4596                    engine_version: "7.1".to_string(),
4597                    cache_cluster_status: "available".to_string(),
4598                    num_cache_nodes: 1,
4599                    preferred_availability_zone: "us-east-1a".to_string(),
4600                    cache_subnet_group_name: Some("default".to_string()),
4601                    auto_minor_version_upgrade: true,
4602                    arn,
4603                    created_at: "2024-01-01T00:00:00Z".to_string(),
4604                    endpoint_address: "127.0.0.1".to_string(),
4605                    endpoint_port: 6379,
4606                    container_id: "abc123".to_string(),
4607                    host_port: 6379,
4608                    replication_group_id: None,
4609                },
4610            );
4611        }
4612        ElastiCacheService::new(shared)
4613    }
4614
4615    #[test]
4616    fn describe_cache_clusters_returns_all() {
4617        let service = service_with_cache_cluster("cluster-a");
4618        {
4619            let mut state = service.state.write();
4620            let arn = "arn:aws:elasticache:us-east-1:123456789012:cluster:cluster-b".to_string();
4621            state.tags.insert(arn.clone(), Vec::new());
4622            state.cache_clusters.insert(
4623                "cluster-b".to_string(),
4624                CacheCluster {
4625                    cache_cluster_id: "cluster-b".to_string(),
4626                    cache_node_type: "cache.t3.micro".to_string(),
4627                    engine: "valkey".to_string(),
4628                    engine_version: "8.0".to_string(),
4629                    cache_cluster_status: "available".to_string(),
4630                    num_cache_nodes: 2,
4631                    preferred_availability_zone: "us-east-1b".to_string(),
4632                    cache_subnet_group_name: Some("default".to_string()),
4633                    auto_minor_version_upgrade: false,
4634                    arn,
4635                    created_at: "2024-01-02T00:00:00Z".to_string(),
4636                    endpoint_address: "127.0.0.1".to_string(),
4637                    endpoint_port: 6380,
4638                    container_id: "def456".to_string(),
4639                    host_port: 6380,
4640                    replication_group_id: None,
4641                },
4642            );
4643        }
4644
4645        let req = request("DescribeCacheClusters", &[]);
4646        let resp = service.describe_cache_clusters(&req).unwrap();
4647        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4648        assert!(body.contains("<CacheClusterId>cluster-a</CacheClusterId>"));
4649        assert!(body.contains("<CacheClusterId>cluster-b</CacheClusterId>"));
4650        assert!(body.contains("<DescribeCacheClustersResponse"));
4651    }
4652
4653    #[tokio::test]
4654    async fn create_cache_cluster_validates_engine_before_runtime() {
4655        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4656        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4657        let service = ElastiCacheService::new(shared);
4658
4659        let req = request(
4660            "CreateCacheCluster",
4661            &[("CacheClusterId", "bad-engine"), ("Engine", "memcached")],
4662        );
4663        assert!(service.create_cache_cluster(&req).await.is_err());
4664    }
4665
4666    #[tokio::test]
4667    async fn create_cache_cluster_without_runtime_cancels_reservation() {
4668        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4669        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4670        let service = ElastiCacheService::new(shared.clone());
4671
4672        let req = request("CreateCacheCluster", &[("CacheClusterId", "no-runtime")]);
4673        assert!(service.create_cache_cluster(&req).await.is_err());
4674
4675        let mut state = shared.write();
4676        assert!(state.begin_cache_cluster_creation("no-runtime"));
4677    }
4678
4679    #[test]
4680    fn describe_cache_clusters_filters_by_id_and_shows_node_info() {
4681        let service = service_with_cache_cluster("nodeful-cluster");
4682        let req = request(
4683            "DescribeCacheClusters",
4684            &[
4685                ("CacheClusterId", "nodeful-cluster"),
4686                ("ShowCacheNodeInfo", "true"),
4687            ],
4688        );
4689        let resp = service.describe_cache_clusters(&req).unwrap();
4690        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4691        assert!(body.contains("<CacheClusterId>nodeful-cluster</CacheClusterId>"));
4692        assert!(body.contains("<CacheNodes>"));
4693        assert!(body.contains("<CacheNodeId>0001</CacheNodeId>"));
4694        assert!(body.contains("<ParameterGroupStatus>in-sync</ParameterGroupStatus>"));
4695    }
4696
4697    #[test]
4698    fn describe_cache_clusters_not_found() {
4699        let service = service_with_cache_cluster("cluster-a");
4700        let req = request("DescribeCacheClusters", &[("CacheClusterId", "missing")]);
4701        assert!(service.describe_cache_clusters(&req).is_err());
4702    }
4703
4704    #[tokio::test]
4705    async fn delete_cache_cluster_removes_state_and_tags() {
4706        let service = service_with_cache_cluster("delete-me");
4707        let arn = "arn:aws:elasticache:us-east-1:123456789012:cluster:delete-me".to_string();
4708
4709        let req = request("DeleteCacheCluster", &[("CacheClusterId", "delete-me")]);
4710        let resp = service.delete_cache_cluster(&req).await.unwrap();
4711        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4712        assert!(body.contains("<CacheClusterStatus>deleting</CacheClusterStatus>"));
4713        assert!(body.contains("<DeleteCacheClusterResponse"));
4714        assert!(!service
4715            .state
4716            .read()
4717            .cache_clusters
4718            .contains_key("delete-me"));
4719        assert!(!service.state.read().tags.contains_key(&arn));
4720    }
4721
4722    #[test]
4723    fn add_cluster_to_replication_group_updates_members_and_count() {
4724        let mut state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4725        state.replication_groups.insert(
4726            "rg-1".to_string(),
4727            ReplicationGroup {
4728                replication_group_id: "rg-1".to_string(),
4729                description: "test group".to_string(),
4730                global_replication_group_id: None,
4731                global_replication_group_role: None,
4732                status: "available".to_string(),
4733                cache_node_type: "cache.t3.micro".to_string(),
4734                engine: "redis".to_string(),
4735                engine_version: "7.1".to_string(),
4736                num_cache_clusters: 1,
4737                automatic_failover_enabled: false,
4738                endpoint_address: "127.0.0.1".to_string(),
4739                endpoint_port: 6379,
4740                arn: "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:rg-1".to_string(),
4741                created_at: "2024-01-01T00:00:00Z".to_string(),
4742                container_id: "abc123".to_string(),
4743                host_port: 6379,
4744                member_clusters: vec!["rg-1-001".to_string()],
4745                snapshot_retention_limit: 0,
4746                snapshot_window: "05:00-09:00".to_string(),
4747            },
4748        );
4749
4750        add_cluster_to_replication_group(&mut state, "rg-1", "manual-cluster");
4751
4752        let group = state.replication_groups.get("rg-1").unwrap();
4753        assert_eq!(group.member_clusters, vec!["rg-1-001", "manual-cluster"]);
4754        assert_eq!(group.num_cache_clusters, 2);
4755    }
4756
4757    #[tokio::test]
4758    async fn delete_cache_cluster_removes_cluster_from_replication_group() {
4759        let service = service_with_cache_cluster("delete-rg-cluster");
4760        {
4761            let mut state = service.state.write();
4762            state
4763                .cache_clusters
4764                .get_mut("delete-rg-cluster")
4765                .unwrap()
4766                .replication_group_id = Some("delete-rg".to_string());
4767            state.replication_groups.insert(
4768                "delete-rg".to_string(),
4769                ReplicationGroup {
4770                    replication_group_id: "delete-rg".to_string(),
4771                    description: "test group".to_string(),
4772                    global_replication_group_id: None,
4773                    global_replication_group_role: None,
4774                    status: "available".to_string(),
4775                    cache_node_type: "cache.t3.micro".to_string(),
4776                    engine: "redis".to_string(),
4777                    engine_version: "7.1".to_string(),
4778                    num_cache_clusters: 2,
4779                    automatic_failover_enabled: false,
4780                    endpoint_address: "127.0.0.1".to_string(),
4781                    endpoint_port: 6379,
4782                    arn: "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:delete-rg"
4783                        .to_string(),
4784                    created_at: "2024-01-01T00:00:00Z".to_string(),
4785                    container_id: "abc123".to_string(),
4786                    host_port: 6379,
4787                    member_clusters: vec![
4788                        "delete-rg-001".to_string(),
4789                        "delete-rg-cluster".to_string(),
4790                    ],
4791                    snapshot_retention_limit: 0,
4792                    snapshot_window: "05:00-09:00".to_string(),
4793                },
4794            );
4795        }
4796
4797        let req = request(
4798            "DeleteCacheCluster",
4799            &[("CacheClusterId", "delete-rg-cluster")],
4800        );
4801        service.delete_cache_cluster(&req).await.unwrap();
4802
4803        let group = service
4804            .state
4805            .read()
4806            .replication_groups
4807            .get("delete-rg")
4808            .unwrap()
4809            .clone();
4810        assert_eq!(group.member_clusters, vec!["delete-rg-001"]);
4811        assert_eq!(group.num_cache_clusters, 1);
4812    }
4813
4814    #[test]
4815    fn create_snapshot_rejects_standalone_cache_cluster_id() {
4816        let service = service_with_cache_cluster("standalone");
4817        let req = request(
4818            "CreateSnapshot",
4819            &[
4820                ("SnapshotName", "standalone-snap"),
4821                ("CacheClusterId", "standalone"),
4822            ],
4823        );
4824        assert!(service.create_snapshot(&req).is_err());
4825    }
4826
4827    fn service_with_replication_group(group_id: &str, num_clusters: i32) -> ElastiCacheService {
4828        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4829        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4830        {
4831            let mut s = shared.write();
4832            let member_clusters: Vec<String> = (1..=num_clusters)
4833                .map(|i| format!("{group_id}-{i:03}"))
4834                .collect();
4835            let arn =
4836                format!("arn:aws:elasticache:us-east-1:123456789012:replicationgroup:{group_id}");
4837            s.tags.insert(arn.clone(), Vec::new());
4838            s.replication_groups.insert(
4839                group_id.to_string(),
4840                ReplicationGroup {
4841                    replication_group_id: group_id.to_string(),
4842                    description: "test group".to_string(),
4843                    global_replication_group_id: None,
4844                    global_replication_group_role: None,
4845                    status: "available".to_string(),
4846                    cache_node_type: "cache.t3.micro".to_string(),
4847                    engine: "redis".to_string(),
4848                    engine_version: "7.1".to_string(),
4849                    num_cache_clusters: num_clusters,
4850                    automatic_failover_enabled: false,
4851                    endpoint_address: "127.0.0.1".to_string(),
4852                    endpoint_port: 6379,
4853                    arn,
4854                    created_at: "2024-01-01T00:00:00Z".to_string(),
4855                    container_id: "abc123".to_string(),
4856                    host_port: 6379,
4857                    member_clusters,
4858                    snapshot_retention_limit: 0,
4859                    snapshot_window: "05:00-09:00".to_string(),
4860                },
4861            );
4862        }
4863        ElastiCacheService::new(shared)
4864    }
4865
4866    fn service_with_serverless_cache(cache_name: &str) -> ElastiCacheService {
4867        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4868        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4869        {
4870            let mut s = shared.write();
4871            let arn =
4872                format!("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:{cache_name}");
4873            s.tags.insert(arn.clone(), Vec::new());
4874            s.serverless_caches.insert(
4875                cache_name.to_string(),
4876                ServerlessCache {
4877                    serverless_cache_name: cache_name.to_string(),
4878                    description: "serverless cache".to_string(),
4879                    engine: "redis".to_string(),
4880                    major_engine_version: "7.1".to_string(),
4881                    full_engine_version: "7.1".to_string(),
4882                    status: "available".to_string(),
4883                    endpoint: ServerlessCacheEndpoint {
4884                        address: "127.0.0.1".to_string(),
4885                        port: 6379,
4886                    },
4887                    reader_endpoint: ServerlessCacheEndpoint {
4888                        address: "127.0.0.1".to_string(),
4889                        port: 6379,
4890                    },
4891                    arn,
4892                    created_at: "2024-01-01T00:00:00Z".to_string(),
4893                    cache_usage_limits: Some(ServerlessCacheUsageLimits {
4894                        data_storage: Some(ServerlessCacheDataStorage {
4895                            maximum: Some(10),
4896                            minimum: Some(1),
4897                            unit: Some("GB".to_string()),
4898                        }),
4899                        ecpu_per_second: Some(ServerlessCacheEcpuPerSecond {
4900                            maximum: Some(5000),
4901                            minimum: Some(1000),
4902                        }),
4903                    }),
4904                    security_group_ids: vec!["sg-123".to_string()],
4905                    subnet_ids: vec!["subnet-123".to_string()],
4906                    kms_key_id: Some("kms-123".to_string()),
4907                    user_group_id: None,
4908                    snapshot_retention_limit: Some(1),
4909                    daily_snapshot_time: Some("03:00".to_string()),
4910                    container_id: "cid".to_string(),
4911                    host_port: 6379,
4912                },
4913            );
4914        }
4915        ElastiCacheService::new(shared)
4916    }
4917
4918    fn service_with_global_replication_group(
4919        global_replication_group_id: &str,
4920        replication_group_id: &str,
4921    ) -> ElastiCacheService {
4922        let service = service_with_replication_group(replication_group_id, 1);
4923        {
4924            let mut state = service.state.write();
4925            state
4926                .replication_groups
4927                .get_mut(replication_group_id)
4928                .unwrap()
4929                .global_replication_group_id = Some(global_replication_group_id.to_string());
4930            state
4931                .replication_groups
4932                .get_mut(replication_group_id)
4933                .unwrap()
4934                .global_replication_group_role = Some("primary".to_string());
4935            state.global_replication_groups.insert(
4936                global_replication_group_id.to_string(),
4937                GlobalReplicationGroup {
4938                    global_replication_group_id: global_replication_group_id.to_string(),
4939                    global_replication_group_description: "global test group".to_string(),
4940                    status: "available".to_string(),
4941                    cache_node_type: "cache.t3.micro".to_string(),
4942                    engine: "redis".to_string(),
4943                    engine_version: "7.1".to_string(),
4944                    members: vec![GlobalReplicationGroupMember {
4945                        replication_group_id: replication_group_id.to_string(),
4946                        replication_group_region: "us-east-1".to_string(),
4947                        role: "primary".to_string(),
4948                        automatic_failover: false,
4949                        status: "associated".to_string(),
4950                    }],
4951                    cluster_enabled: false,
4952                    arn: format!(
4953                        "arn:aws:elasticache:us-east-1:123456789012:globalreplicationgroup:{global_replication_group_id}"
4954                    ),
4955                },
4956            );
4957        }
4958        service
4959    }
4960
4961    #[test]
4962    fn create_global_replication_group_registers_metadata_and_updates_primary_group() {
4963        let service = service_with_replication_group("primary-rg", 1);
4964        let req = request(
4965            "CreateGlobalReplicationGroup",
4966            &[
4967                ("GlobalReplicationGroupIdSuffix", "global-a"),
4968                ("PrimaryReplicationGroupId", "primary-rg"),
4969                ("GlobalReplicationGroupDescription", "global slice"),
4970            ],
4971        );
4972
4973        let resp = service.create_global_replication_group(&req).unwrap();
4974        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4975        assert!(body.contains(
4976            "<GlobalReplicationGroupDescription>global slice</GlobalReplicationGroupDescription>"
4977        ));
4978        assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
4979        assert!(body.contains("<Role>primary</Role>"));
4980
4981        let state = service.state.read();
4982        let primary_group = state.replication_groups.get("primary-rg").unwrap();
4983        assert_eq!(
4984            primary_group.global_replication_group_id.as_deref(),
4985            Some("fc-us-east-1-global-a")
4986        );
4987        assert_eq!(
4988            primary_group.global_replication_group_role.as_deref(),
4989            Some("primary")
4990        );
4991        assert!(state
4992            .global_replication_groups
4993            .contains_key("fc-us-east-1-global-a"));
4994    }
4995
4996    #[test]
4997    fn describe_global_replication_groups_filters_by_id() {
4998        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
4999        let req = request(
5000            "DescribeGlobalReplicationGroups",
5001            &[
5002                ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5003                ("ShowMemberInfo", "true"),
5004            ],
5005        );
5006
5007        let resp = service.describe_global_replication_groups(&req).unwrap();
5008        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5009        assert!(body.contains(
5010            "<GlobalReplicationGroupId>fc-us-east-1-global-a</GlobalReplicationGroupId>"
5011        ));
5012        assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5013        assert!(body.contains("<DescribeGlobalReplicationGroupsResponse"));
5014    }
5015
5016    #[test]
5017    fn modify_global_replication_group_updates_primary_replication_group_state() {
5018        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5019        let req = request(
5020            "ModifyGlobalReplicationGroup",
5021            &[
5022                ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5023                ("ApplyImmediately", "true"),
5024                ("GlobalReplicationGroupDescription", "updated"),
5025                ("CacheNodeType", "cache.m5.large"),
5026                ("EngineVersion", "7.2"),
5027                ("AutomaticFailoverEnabled", "true"),
5028            ],
5029        );
5030
5031        let resp = service.modify_global_replication_group(&req).unwrap();
5032        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5033        assert!(body.contains(
5034            "<GlobalReplicationGroupDescription>updated</GlobalReplicationGroupDescription>"
5035        ));
5036        assert!(body.contains("<CacheNodeType>cache.m5.large</CacheNodeType>"));
5037        assert!(body.contains("<EngineVersion>7.2</EngineVersion>"));
5038
5039        let state = service.state.read();
5040        let primary_group = state.replication_groups.get("primary-rg").unwrap();
5041        assert_eq!(primary_group.cache_node_type, "cache.m5.large");
5042        assert_eq!(primary_group.engine_version, "7.2");
5043        assert!(primary_group.automatic_failover_enabled);
5044    }
5045
5046    #[test]
5047    fn delete_global_replication_group_clears_primary_group_association() {
5048        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5049        let req = request(
5050            "DeleteGlobalReplicationGroup",
5051            &[
5052                ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5053                ("RetainPrimaryReplicationGroup", "true"),
5054            ],
5055        );
5056
5057        let resp = service.delete_global_replication_group(&req).unwrap();
5058        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5059        assert!(body.contains("<Status>deleting</Status>"));
5060
5061        let state = service.state.read();
5062        assert!(!state
5063            .global_replication_groups
5064            .contains_key("fc-us-east-1-global-a"));
5065        let primary_group = state.replication_groups.get("primary-rg").unwrap();
5066        assert!(primary_group.global_replication_group_id.is_none());
5067        assert!(primary_group.global_replication_group_role.is_none());
5068    }
5069
5070    #[test]
5071    fn replication_group_xml_includes_global_replication_group_info() {
5072        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5073        let req = request(
5074            "DescribeReplicationGroups",
5075            &[("ReplicationGroupId", "primary-rg")],
5076        );
5077
5078        let resp = service.describe_replication_groups(&req).unwrap();
5079        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5080        assert!(body.contains("<GlobalReplicationGroupInfo>"));
5081        assert!(body.contains(
5082            "<GlobalReplicationGroupId>fc-us-east-1-global-a</GlobalReplicationGroupId>"
5083        ));
5084        assert!(body.contains(
5085            "<GlobalReplicationGroupMemberRole>primary</GlobalReplicationGroupMemberRole>"
5086        ));
5087    }
5088
5089    #[test]
5090    fn failover_global_replication_group_returns_current_primary() {
5091        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5092        let req = request(
5093            "FailoverGlobalReplicationGroup",
5094            &[
5095                ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5096                ("PrimaryRegion", "us-east-1"),
5097                ("PrimaryReplicationGroupId", "primary-rg"),
5098            ],
5099        );
5100
5101        let resp = service.failover_global_replication_group(&req).unwrap();
5102        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5103        assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5104        assert!(body.contains("<FailoverGlobalReplicationGroupResponse"));
5105    }
5106
5107    #[test]
5108    fn disassociate_global_replication_group_accepts_current_primary_as_noop() {
5109        let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5110        let req = request(
5111            "DisassociateGlobalReplicationGroup",
5112            &[
5113                ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5114                ("ReplicationGroupId", "primary-rg"),
5115                ("ReplicationGroupRegion", "us-east-1"),
5116            ],
5117        );
5118
5119        let resp = service.disassociate_global_replication_group(&req).unwrap();
5120        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5121        assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5122        assert!(body.contains("<DisassociateGlobalReplicationGroupResponse"));
5123    }
5124
5125    #[test]
5126    fn modify_replication_group_updates_description() {
5127        let service = service_with_replication_group("my-rg", 1);
5128        let req = request(
5129            "ModifyReplicationGroup",
5130            &[
5131                ("ReplicationGroupId", "my-rg"),
5132                ("ReplicationGroupDescription", "Updated description"),
5133            ],
5134        );
5135        let resp = service.modify_replication_group(&req).unwrap();
5136        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5137        assert!(body.contains("<Description>Updated description</Description>"));
5138        assert!(body.contains("<ModifyReplicationGroupResponse"));
5139    }
5140
5141    #[test]
5142    fn modify_replication_group_updates_multiple_fields() {
5143        let service = service_with_replication_group("my-rg", 1);
5144        let req = request(
5145            "ModifyReplicationGroup",
5146            &[
5147                ("ReplicationGroupId", "my-rg"),
5148                ("CacheNodeType", "cache.m5.large"),
5149                ("AutomaticFailoverEnabled", "true"),
5150                ("SnapshotRetentionLimit", "5"),
5151                ("SnapshotWindow", "02:00-06:00"),
5152            ],
5153        );
5154        let resp = service.modify_replication_group(&req).unwrap();
5155        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5156        assert!(body.contains("<CacheNodeType>cache.m5.large</CacheNodeType>"));
5157        assert!(body.contains("<AutomaticFailover>enabled</AutomaticFailover>"));
5158        assert!(body.contains("<SnapshotRetentionLimit>5</SnapshotRetentionLimit>"));
5159        assert!(body.contains("<SnapshotWindow>02:00-06:00</SnapshotWindow>"));
5160    }
5161
5162    #[test]
5163    fn modify_replication_group_not_found() {
5164        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5165        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5166        let service = ElastiCacheService::new(shared);
5167        let req = request(
5168            "ModifyReplicationGroup",
5169            &[("ReplicationGroupId", "nonexistent")],
5170        );
5171        assert!(service.modify_replication_group(&req).is_err());
5172    }
5173
5174    #[test]
5175    fn parse_cache_usage_limits_reads_nested_query_shape() {
5176        let req = request(
5177            "CreateServerlessCache",
5178            &[
5179                ("CacheUsageLimits.DataStorage.Maximum", "10"),
5180                ("CacheUsageLimits.DataStorage.Minimum", "2"),
5181                ("CacheUsageLimits.DataStorage.Unit", "GB"),
5182                ("CacheUsageLimits.ECPUPerSecond.Maximum", "5000"),
5183                ("CacheUsageLimits.ECPUPerSecond.Minimum", "1000"),
5184            ],
5185        );
5186
5187        let limits = parse_cache_usage_limits(&req).unwrap().unwrap();
5188        let data_storage = limits.data_storage.unwrap();
5189        assert_eq!(data_storage.maximum, Some(10));
5190        assert_eq!(data_storage.minimum, Some(2));
5191        assert_eq!(data_storage.unit.as_deref(), Some("GB"));
5192
5193        let ecpu = limits.ecpu_per_second.unwrap();
5194        assert_eq!(ecpu.maximum, Some(5000));
5195        assert_eq!(ecpu.minimum, Some(1000));
5196    }
5197
5198    #[test]
5199    fn serverless_cache_xml_contains_expected_fields() {
5200        let cache = service_with_serverless_cache("cache-a")
5201            .state
5202            .read()
5203            .serverless_caches["cache-a"]
5204            .clone();
5205
5206        let xml = serverless_cache_xml(&cache);
5207        assert!(xml.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5208        assert!(xml.contains("<Engine>redis</Engine>"));
5209        assert!(xml.contains("<MajorEngineVersion>7.1</MajorEngineVersion>"));
5210        assert!(xml.contains("<Endpoint><Address>127.0.0.1</Address><Port>6379</Port></Endpoint>"));
5211        assert!(xml.contains(
5212            "<ReaderEndpoint><Address>127.0.0.1</Address><Port>6379</Port></ReaderEndpoint>"
5213        ));
5214        assert!(xml.contains(
5215            "<SecurityGroupIds><SecurityGroupId>sg-123</SecurityGroupId></SecurityGroupIds>"
5216        ));
5217        assert!(xml.contains("<SubnetIds><member>subnet-123</member></SubnetIds>"));
5218        assert!(xml.contains("<CacheUsageLimits>"));
5219    }
5220
5221    #[test]
5222    fn serverless_cache_snapshot_xml_contains_expected_fields() {
5223        let snapshot = ServerlessCacheSnapshot {
5224            serverless_cache_snapshot_name: "snap-a".to_string(),
5225            arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a".to_string(),
5226            kms_key_id: Some("kms-123".to_string()),
5227            snapshot_type: "manual".to_string(),
5228            status: "available".to_string(),
5229            create_time: "2024-01-01T00:00:00Z".to_string(),
5230            expiry_time: None,
5231            bytes_used_for_cache: Some("0".to_string()),
5232            serverless_cache_name: "cache-a".to_string(),
5233            engine: "redis".to_string(),
5234            major_engine_version: "7.1".to_string(),
5235        };
5236
5237        let xml = serverless_cache_snapshot_xml(&snapshot);
5238        assert!(xml.contains("<ServerlessCacheSnapshotName>snap-a</ServerlessCacheSnapshotName>"));
5239        assert!(xml.contains("<KmsKeyId>kms-123</KmsKeyId>"));
5240        assert!(xml.contains("<SnapshotType>manual</SnapshotType>"));
5241        assert!(xml.contains("<ServerlessCacheConfiguration>"));
5242        assert!(xml.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5243    }
5244
5245    #[test]
5246    fn describe_serverless_caches_returns_all() {
5247        let service = service_with_serverless_cache("cache-a");
5248        {
5249            let mut state = service.state.write();
5250            state.serverless_caches.insert(
5251                "cache-b".to_string(),
5252                ServerlessCache {
5253                    serverless_cache_name: "cache-b".to_string(),
5254                    description: "serverless cache".to_string(),
5255                    engine: "valkey".to_string(),
5256                    major_engine_version: "8.0".to_string(),
5257                    full_engine_version: "8.0".to_string(),
5258                    status: "available".to_string(),
5259                    endpoint: ServerlessCacheEndpoint {
5260                        address: "127.0.0.1".to_string(),
5261                        port: 6380,
5262                    },
5263                    reader_endpoint: ServerlessCacheEndpoint {
5264                        address: "127.0.0.1".to_string(),
5265                        port: 6380,
5266                    },
5267                    arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-b"
5268                        .to_string(),
5269                    created_at: "2024-01-02T00:00:00Z".to_string(),
5270                    cache_usage_limits: None,
5271                    security_group_ids: Vec::new(),
5272                    subnet_ids: Vec::new(),
5273                    kms_key_id: None,
5274                    user_group_id: None,
5275                    snapshot_retention_limit: None,
5276                    daily_snapshot_time: None,
5277                    container_id: "cid".to_string(),
5278                    host_port: 6380,
5279                },
5280            );
5281        }
5282
5283        let resp = service
5284            .describe_serverless_caches(&request("DescribeServerlessCaches", &[]))
5285            .unwrap();
5286        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5287        assert!(body.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5288        assert!(body.contains("<ServerlessCacheName>cache-b</ServerlessCacheName>"));
5289    }
5290
5291    #[test]
5292    fn modify_serverless_cache_updates_fields() {
5293        let service = service_with_serverless_cache("cache-a");
5294        let req = request(
5295            "ModifyServerlessCache",
5296            &[
5297                ("ServerlessCacheName", "cache-a"),
5298                ("Description", "updated"),
5299                ("SecurityGroupIds.SecurityGroupId.1", "sg-999"),
5300                ("SnapshotRetentionLimit", "7"),
5301                ("DailySnapshotTime", "05:00"),
5302            ],
5303        );
5304
5305        let resp = service.modify_serverless_cache(&req).unwrap();
5306        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5307        assert!(body.contains("<Description>updated</Description>"));
5308        assert!(body.contains(
5309            "<SecurityGroupIds><SecurityGroupId>sg-999</SecurityGroupId></SecurityGroupIds>"
5310        ));
5311        assert!(body.contains("<SnapshotRetentionLimit>7</SnapshotRetentionLimit>"));
5312        assert!(body.contains("<DailySnapshotTime>05:00</DailySnapshotTime>"));
5313    }
5314
5315    #[test]
5316    fn parse_query_list_param_reads_indexed_and_flat_query_values() {
5317        let req = AwsRequest {
5318            service: "elasticache".to_string(),
5319            action: "ModifyServerlessCache".to_string(),
5320            region: "us-east-1".to_string(),
5321            account_id: "000000000000".to_string(),
5322            request_id: "req-1".to_string(),
5323            headers: HeaderMap::new(),
5324            query_params: HashMap::from([
5325                ("SecurityGroupIds.member.1".to_string(), "sg-a".to_string()),
5326                ("SecurityGroupIds.member.2".to_string(), "sg-b".to_string()),
5327            ]),
5328            body: Bytes::new(),
5329            path_segments: vec![],
5330            raw_path: "/".to_string(),
5331            raw_query: String::new(),
5332            method: Method::POST,
5333            is_query_protocol: true,
5334            access_key_id: None,
5335            principal: None,
5336        };
5337        assert_eq!(
5338            parse_query_list_param(&req, "SecurityGroupIds", "SecurityGroupId"),
5339            vec!["sg-a".to_string(), "sg-b".to_string()]
5340        );
5341
5342        let req = AwsRequest {
5343            query_params: HashMap::from([("SecurityGroupIds".to_string(), "sg-flat".to_string())]),
5344            ..req
5345        };
5346        assert_eq!(
5347            parse_query_list_param(&req, "SecurityGroupIds", "SecurityGroupId"),
5348            vec!["sg-flat".to_string()]
5349        );
5350    }
5351
5352    #[test]
5353    fn describe_serverless_cache_snapshots_filters_by_cache_name() {
5354        let service = service_with_serverless_cache("cache-a");
5355        {
5356            let mut state = service.state.write();
5357            state.serverless_cache_snapshots.insert(
5358                "snap-a".to_string(),
5359                ServerlessCacheSnapshot {
5360                    serverless_cache_snapshot_name: "snap-a".to_string(),
5361                    arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a"
5362                        .to_string(),
5363                    kms_key_id: None,
5364                    snapshot_type: "manual".to_string(),
5365                    status: "available".to_string(),
5366                    create_time: "2024-01-01T00:00:00Z".to_string(),
5367                    expiry_time: None,
5368                    bytes_used_for_cache: None,
5369                    serverless_cache_name: "cache-a".to_string(),
5370                    engine: "redis".to_string(),
5371                    major_engine_version: "7.1".to_string(),
5372                },
5373            );
5374        }
5375
5376        let resp = service
5377            .describe_serverless_cache_snapshots(&request(
5378                "DescribeServerlessCacheSnapshots",
5379                &[("ServerlessCacheName", "cache-a")],
5380            ))
5381            .unwrap();
5382        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5383        assert!(body.contains("<ServerlessCacheSnapshotName>snap-a</ServerlessCacheSnapshotName>"));
5384    }
5385
5386    #[test]
5387    fn delete_serverless_cache_snapshot_removes_tags() {
5388        let service = service_with_serverless_cache("cache-a");
5389        {
5390            let mut state = service.state.write();
5391            let arn =
5392                "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a".to_string();
5393            state.tags.insert(arn.clone(), Vec::new());
5394            state.serverless_cache_snapshots.insert(
5395                "snap-a".to_string(),
5396                ServerlessCacheSnapshot {
5397                    serverless_cache_snapshot_name: "snap-a".to_string(),
5398                    arn,
5399                    kms_key_id: None,
5400                    snapshot_type: "manual".to_string(),
5401                    status: "available".to_string(),
5402                    create_time: "2024-01-01T00:00:00Z".to_string(),
5403                    expiry_time: None,
5404                    bytes_used_for_cache: None,
5405                    serverless_cache_name: "cache-a".to_string(),
5406                    engine: "redis".to_string(),
5407                    major_engine_version: "7.1".to_string(),
5408                },
5409            );
5410        }
5411
5412        let resp = service
5413            .delete_serverless_cache_snapshot(&request(
5414                "DeleteServerlessCacheSnapshot",
5415                &[("ServerlessCacheSnapshotName", "snap-a")],
5416            ))
5417            .unwrap();
5418        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5419        assert!(body.contains("<Status>deleting</Status>"));
5420        assert!(!service
5421            .state
5422            .read()
5423            .tags
5424            .contains_key("arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a"));
5425    }
5426
5427    #[test]
5428    fn increase_replica_count_updates_member_clusters() {
5429        let service = service_with_replication_group("my-rg", 1);
5430        let req = request(
5431            "IncreaseReplicaCount",
5432            &[
5433                ("ReplicationGroupId", "my-rg"),
5434                ("ApplyImmediately", "true"),
5435                ("NewReplicaCount", "2"),
5436            ],
5437        );
5438        let resp = service.increase_replica_count(&req).unwrap();
5439        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5440        assert!(body.contains("<ClusterId>my-rg-001</ClusterId>"));
5441        assert!(body.contains("<ClusterId>my-rg-002</ClusterId>"));
5442        assert!(body.contains("<ClusterId>my-rg-003</ClusterId>"));
5443        assert!(body.contains("<IncreaseReplicaCountResponse"));
5444    }
5445
5446    #[test]
5447    fn increase_replica_count_rejects_same_or_lower() {
5448        let service = service_with_replication_group("my-rg", 3);
5449        let req = request(
5450            "IncreaseReplicaCount",
5451            &[
5452                ("ReplicationGroupId", "my-rg"),
5453                ("ApplyImmediately", "true"),
5454                ("NewReplicaCount", "2"),
5455            ],
5456        );
5457        assert!(service.increase_replica_count(&req).is_err());
5458    }
5459
5460    #[test]
5461    fn increase_replica_count_not_found() {
5462        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5463        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5464        let service = ElastiCacheService::new(shared);
5465        let req = request(
5466            "IncreaseReplicaCount",
5467            &[
5468                ("ReplicationGroupId", "nonexistent"),
5469                ("ApplyImmediately", "true"),
5470                ("NewReplicaCount", "2"),
5471            ],
5472        );
5473        assert!(service.increase_replica_count(&req).is_err());
5474    }
5475
5476    #[test]
5477    fn decrease_replica_count_updates_member_clusters() {
5478        let service = service_with_replication_group("my-rg", 3);
5479        let req = request(
5480            "DecreaseReplicaCount",
5481            &[
5482                ("ReplicationGroupId", "my-rg"),
5483                ("ApplyImmediately", "true"),
5484                ("NewReplicaCount", "1"),
5485            ],
5486        );
5487        let resp = service.decrease_replica_count(&req).unwrap();
5488        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5489        assert!(body.contains("<ClusterId>my-rg-001</ClusterId>"));
5490        assert!(body.contains("<ClusterId>my-rg-002</ClusterId>"));
5491        assert!(!body.contains("<ClusterId>my-rg-003</ClusterId>"));
5492        assert!(body.contains("<DecreaseReplicaCountResponse"));
5493    }
5494
5495    #[test]
5496    fn decrease_replica_count_validates_minimum() {
5497        let service = service_with_replication_group("my-rg", 1);
5498        // NewReplicaCount=0 means total=1, which is not fewer than current 1
5499        let req = request(
5500            "DecreaseReplicaCount",
5501            &[
5502                ("ReplicationGroupId", "my-rg"),
5503                ("ApplyImmediately", "true"),
5504                ("NewReplicaCount", "0"),
5505            ],
5506        );
5507        assert!(service.decrease_replica_count(&req).is_err());
5508    }
5509
5510    #[test]
5511    fn decrease_replica_count_rejects_negative() {
5512        let service = service_with_replication_group("my-rg", 2);
5513        let req = request(
5514            "DecreaseReplicaCount",
5515            &[
5516                ("ReplicationGroupId", "my-rg"),
5517                ("ApplyImmediately", "true"),
5518                ("NewReplicaCount", "-1"),
5519            ],
5520        );
5521        assert!(service.decrease_replica_count(&req).is_err());
5522    }
5523
5524    #[test]
5525    fn test_failover_validates_node_group() {
5526        let service = service_with_replication_group("my-rg", 1);
5527        let req = request(
5528            "TestFailover",
5529            &[("ReplicationGroupId", "my-rg"), ("NodeGroupId", "0001")],
5530        );
5531        let resp = service.test_failover(&req).unwrap();
5532        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5533        assert!(body.contains("<Status>available</Status>"));
5534        assert!(body.contains("<TestFailoverResponse"));
5535    }
5536
5537    #[test]
5538    fn test_failover_rejects_invalid_node_group() {
5539        let service = service_with_replication_group("my-rg", 1);
5540        let req = request(
5541            "TestFailover",
5542            &[("ReplicationGroupId", "my-rg"), ("NodeGroupId", "9999")],
5543        );
5544        assert!(service.test_failover(&req).is_err());
5545    }
5546
5547    #[test]
5548    fn test_failover_not_found() {
5549        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5550        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5551        let service = ElastiCacheService::new(shared);
5552        let req = request(
5553            "TestFailover",
5554            &[
5555                ("ReplicationGroupId", "nonexistent"),
5556                ("NodeGroupId", "0001"),
5557            ],
5558        );
5559        assert!(service.test_failover(&req).is_err());
5560    }
5561
5562    // Snapshot tests
5563
5564    #[test]
5565    fn create_snapshot_returns_snapshot_xml() {
5566        let service = service_with_replication_group("snap-rg", 1);
5567        let req = request(
5568            "CreateSnapshot",
5569            &[
5570                ("SnapshotName", "my-snap"),
5571                ("ReplicationGroupId", "snap-rg"),
5572            ],
5573        );
5574        let resp = service.create_snapshot(&req).unwrap();
5575        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5576        assert!(body.contains("<SnapshotName>my-snap</SnapshotName>"));
5577        assert!(body.contains("<ReplicationGroupId>snap-rg</ReplicationGroupId>"));
5578        assert!(body.contains("<SnapshotStatus>available</SnapshotStatus>"));
5579        assert!(body.contains("<SnapshotSource>manual</SnapshotSource>"));
5580        assert!(body.contains("<Engine>redis</Engine>"));
5581        assert!(body.contains("<CreateSnapshotResponse"));
5582    }
5583
5584    #[test]
5585    fn create_snapshot_via_cache_cluster_id() {
5586        let service = service_with_replication_group("cc-rg", 2);
5587        let req = request(
5588            "CreateSnapshot",
5589            &[
5590                ("SnapshotName", "cluster-snap"),
5591                ("CacheClusterId", "cc-rg-001"),
5592            ],
5593        );
5594        let resp = service.create_snapshot(&req).unwrap();
5595        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5596        assert!(body.contains("<ReplicationGroupId>cc-rg</ReplicationGroupId>"));
5597    }
5598
5599    #[test]
5600    fn create_snapshot_rejects_missing_group_and_cluster() {
5601        let service = service_with_replication_group("rg", 1);
5602        let req = request("CreateSnapshot", &[("SnapshotName", "bad-snap")]);
5603        assert!(service.create_snapshot(&req).is_err());
5604    }
5605
5606    #[test]
5607    fn create_snapshot_rejects_duplicate_name() {
5608        let service = service_with_replication_group("dup-rg", 1);
5609        let req = request(
5610            "CreateSnapshot",
5611            &[
5612                ("SnapshotName", "dup-snap"),
5613                ("ReplicationGroupId", "dup-rg"),
5614            ],
5615        );
5616        service.create_snapshot(&req).unwrap();
5617        assert!(service.create_snapshot(&req).is_err());
5618    }
5619
5620    #[test]
5621    fn create_snapshot_rejects_nonexistent_group() {
5622        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5623        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5624        let service = ElastiCacheService::new(shared);
5625        let req = request(
5626            "CreateSnapshot",
5627            &[
5628                ("SnapshotName", "orphan"),
5629                ("ReplicationGroupId", "no-such-rg"),
5630            ],
5631        );
5632        assert!(service.create_snapshot(&req).is_err());
5633    }
5634
5635    #[test]
5636    fn create_snapshot_rejects_missing_name() {
5637        let service = service_with_replication_group("rg", 1);
5638        let req = request("CreateSnapshot", &[("ReplicationGroupId", "rg")]);
5639        assert!(service.create_snapshot(&req).is_err());
5640    }
5641
5642    #[test]
5643    fn create_snapshot_registers_arn_for_tags() {
5644        let service = service_with_replication_group("tag-rg", 1);
5645        let req = request(
5646            "CreateSnapshot",
5647            &[
5648                ("SnapshotName", "tag-snap"),
5649                ("ReplicationGroupId", "tag-rg"),
5650            ],
5651        );
5652        service.create_snapshot(&req).unwrap();
5653
5654        let state = service.state.read();
5655        let arn = "arn:aws:elasticache:us-east-1:123456789012:snapshot:tag-snap".to_string();
5656        assert!(state.tags.contains_key(&arn));
5657    }
5658
5659    #[test]
5660    fn describe_snapshots_returns_all() {
5661        let service = service_with_replication_group("desc-rg", 1);
5662        for name in &["snap-a", "snap-b"] {
5663            let req = request(
5664                "CreateSnapshot",
5665                &[("SnapshotName", name), ("ReplicationGroupId", "desc-rg")],
5666            );
5667            service.create_snapshot(&req).unwrap();
5668        }
5669        let req = request("DescribeSnapshots", &[]);
5670        let resp = service.describe_snapshots(&req).unwrap();
5671        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5672        assert!(body.contains("<SnapshotName>snap-a</SnapshotName>"));
5673        assert!(body.contains("<SnapshotName>snap-b</SnapshotName>"));
5674        assert!(body.contains("<DescribeSnapshotsResponse"));
5675    }
5676
5677    #[test]
5678    fn describe_snapshots_filters_by_name() {
5679        let service = service_with_replication_group("filt-rg", 1);
5680        for name in &["snap-1", "snap-2"] {
5681            let req = request(
5682                "CreateSnapshot",
5683                &[("SnapshotName", name), ("ReplicationGroupId", "filt-rg")],
5684            );
5685            service.create_snapshot(&req).unwrap();
5686        }
5687        let req = request("DescribeSnapshots", &[("SnapshotName", "snap-1")]);
5688        let resp = service.describe_snapshots(&req).unwrap();
5689        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5690        assert!(body.contains("<SnapshotName>snap-1</SnapshotName>"));
5691        assert!(!body.contains("<SnapshotName>snap-2</SnapshotName>"));
5692    }
5693
5694    #[test]
5695    fn describe_snapshots_filters_by_replication_group() {
5696        let service = service_with_replication_group("rg-a", 1);
5697        let req = request(
5698            "CreateSnapshot",
5699            &[
5700                ("SnapshotName", "rg-a-snap"),
5701                ("ReplicationGroupId", "rg-a"),
5702            ],
5703        );
5704        service.create_snapshot(&req).unwrap();
5705
5706        let req = request("DescribeSnapshots", &[("ReplicationGroupId", "rg-a")]);
5707        let resp = service.describe_snapshots(&req).unwrap();
5708        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5709        assert!(body.contains("<SnapshotName>rg-a-snap</SnapshotName>"));
5710
5711        // Filter by non-matching group returns empty
5712        let req = request("DescribeSnapshots", &[("ReplicationGroupId", "rg-b")]);
5713        let resp = service.describe_snapshots(&req).unwrap();
5714        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5715        assert!(!body.contains("<SnapshotName>"));
5716    }
5717
5718    #[test]
5719    fn describe_snapshots_not_found_by_name() {
5720        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5721        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5722        let service = ElastiCacheService::new(shared);
5723        let req = request("DescribeSnapshots", &[("SnapshotName", "nope")]);
5724        assert!(service.describe_snapshots(&req).is_err());
5725    }
5726
5727    #[test]
5728    fn delete_snapshot_removes_and_returns_deleting() {
5729        let service = service_with_replication_group("del-rg", 1);
5730        let req = request(
5731            "CreateSnapshot",
5732            &[
5733                ("SnapshotName", "del-snap"),
5734                ("ReplicationGroupId", "del-rg"),
5735            ],
5736        );
5737        service.create_snapshot(&req).unwrap();
5738
5739        let req = request("DeleteSnapshot", &[("SnapshotName", "del-snap")]);
5740        let resp = service.delete_snapshot(&req).unwrap();
5741        let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5742        assert!(body.contains("<SnapshotStatus>deleting</SnapshotStatus>"));
5743        assert!(body.contains("<DeleteSnapshotResponse"));
5744
5745        // Verify it's gone
5746        assert!(!service.state.read().snapshots.contains_key("del-snap"));
5747    }
5748
5749    #[test]
5750    fn delete_snapshot_cleans_up_tags() {
5751        let service = service_with_replication_group("tag-del-rg", 1);
5752        let req = request(
5753            "CreateSnapshot",
5754            &[
5755                ("SnapshotName", "tag-del-snap"),
5756                ("ReplicationGroupId", "tag-del-rg"),
5757            ],
5758        );
5759        service.create_snapshot(&req).unwrap();
5760
5761        let arn = "arn:aws:elasticache:us-east-1:123456789012:snapshot:tag-del-snap".to_string();
5762        assert!(service.state.read().tags.contains_key(&arn));
5763
5764        let req = request("DeleteSnapshot", &[("SnapshotName", "tag-del-snap")]);
5765        service.delete_snapshot(&req).unwrap();
5766        assert!(!service.state.read().tags.contains_key(&arn));
5767    }
5768
5769    #[test]
5770    fn delete_snapshot_not_found() {
5771        let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5772        let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5773        let service = ElastiCacheService::new(shared);
5774        let req = request("DeleteSnapshot", &[("SnapshotName", "nope")]);
5775        assert!(service.delete_snapshot(&req).is_err());
5776    }
5777
5778    #[test]
5779    fn snapshot_xml_contains_all_fields() {
5780        let snap = CacheSnapshot {
5781            snapshot_name: "test-snap".to_string(),
5782            replication_group_id: "rg-1".to_string(),
5783            replication_group_description: "desc".to_string(),
5784            snapshot_status: "available".to_string(),
5785            cache_node_type: "cache.t3.micro".to_string(),
5786            engine: "redis".to_string(),
5787            engine_version: "7.1".to_string(),
5788            num_cache_clusters: 2,
5789            arn: "arn:aws:elasticache:us-east-1:123:snapshot:test-snap".to_string(),
5790            created_at: "2024-01-01T00:00:00Z".to_string(),
5791            snapshot_source: "manual".to_string(),
5792        };
5793        let xml = snapshot_xml(&snap);
5794        assert!(xml.contains("<SnapshotName>test-snap</SnapshotName>"));
5795        assert!(xml.contains("<ReplicationGroupId>rg-1</ReplicationGroupId>"));
5796        assert!(xml.contains("<SnapshotStatus>available</SnapshotStatus>"));
5797        assert!(xml.contains("<SnapshotSource>manual</SnapshotSource>"));
5798        assert!(xml.contains("<CacheNodeType>cache.t3.micro</CacheNodeType>"));
5799        assert!(xml.contains("<Engine>redis</Engine>"));
5800        assert!(xml.contains("<EngineVersion>7.1</EngineVersion>"));
5801        assert!(xml.contains("<NumCacheClusters>2</NumCacheClusters>"));
5802        assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:snapshot:test-snap</ARN>"));
5803    }
5804}