1use std::convert::TryFrom;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use http::StatusCode;
6
7use fakecloud_aws::xml::xml_escape;
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9
10use crate::runtime::{ElastiCacheRuntime, RuntimeError};
11use crate::state::{
12 default_engine_versions, default_parameters_for_family, CacheCluster, CacheEngineVersion,
13 CacheParameterGroup, CacheSnapshot, CacheSubnetGroup, ElastiCacheState, ElastiCacheUser,
14 ElastiCacheUserGroup, EngineDefaultParameter, GlobalReplicationGroup,
15 GlobalReplicationGroupMember, RecurringCharge, ReplicationGroup, ReservedCacheNode,
16 ReservedCacheNodesOffering, ServerlessCache, ServerlessCacheDataStorage,
17 ServerlessCacheEcpuPerSecond, ServerlessCacheEndpoint, ServerlessCacheSnapshot,
18 ServerlessCacheUsageLimits, SharedElastiCacheState,
19};
20
21const ELASTICACHE_NS: &str = "http://elasticache.amazonaws.com/doc/2015-02-02/";
22
23const ENGINE_REDIS: &str = "redis";
26const ENGINE_VALKEY: &str = "valkey";
27const SUPPORTED_ENGINES: &[&str] = &[ENGINE_REDIS, ENGINE_VALKEY];
28
29fn validate_engine(engine: &str) -> Result<(), AwsServiceError> {
30 if !SUPPORTED_ENGINES.contains(&engine) {
31 return Err(AwsServiceError::aws_error(
32 StatusCode::BAD_REQUEST,
33 "InvalidParameterValue",
34 format!("Invalid value for Engine: {engine}. Supported engines: redis, valkey"),
35 ));
36 }
37 Ok(())
38}
39const SUPPORTED_ACTIONS: &[&str] = &[
40 "AddTagsToResource",
41 "CreateCacheCluster",
42 "CreateGlobalReplicationGroup",
43 "CreateCacheSubnetGroup",
44 "CreateReplicationGroup",
45 "CreateServerlessCache",
46 "CreateServerlessCacheSnapshot",
47 "CreateSnapshot",
48 "CreateUser",
49 "CreateUserGroup",
50 "DecreaseReplicaCount",
51 "DeleteCacheCluster",
52 "DeleteGlobalReplicationGroup",
53 "DeleteCacheSubnetGroup",
54 "DeleteReplicationGroup",
55 "DeleteServerlessCache",
56 "DeleteServerlessCacheSnapshot",
57 "DeleteSnapshot",
58 "DeleteUser",
59 "DeleteUserGroup",
60 "DescribeCacheClusters",
61 "DescribeCacheEngineVersions",
62 "DescribeGlobalReplicationGroups",
63 "DescribeCacheParameterGroups",
64 "DescribeReservedCacheNodes",
65 "DescribeReservedCacheNodesOfferings",
66 "DescribeCacheSubnetGroups",
67 "DescribeEngineDefaultParameters",
68 "DescribeReplicationGroups",
69 "DescribeServerlessCaches",
70 "DescribeServerlessCacheSnapshots",
71 "DescribeSnapshots",
72 "DescribeUserGroups",
73 "DescribeUsers",
74 "DisassociateGlobalReplicationGroup",
75 "FailoverGlobalReplicationGroup",
76 "IncreaseReplicaCount",
77 "ListTagsForResource",
78 "ModifyCacheSubnetGroup",
79 "ModifyGlobalReplicationGroup",
80 "ModifyReplicationGroup",
81 "ModifyServerlessCache",
82 "RemoveTagsFromResource",
83 "TestFailover",
84];
85
86pub struct ElastiCacheService {
87 state: SharedElastiCacheState,
88 runtime: Option<Arc<ElastiCacheRuntime>>,
89}
90
91impl ElastiCacheService {
92 pub fn new(state: SharedElastiCacheState) -> Self {
93 Self {
94 state,
95 runtime: None,
96 }
97 }
98
99 pub fn with_runtime(mut self, runtime: Arc<ElastiCacheRuntime>) -> Self {
100 self.runtime = Some(runtime);
101 self
102 }
103}
104
105#[async_trait]
106impl AwsService for ElastiCacheService {
107 fn service_name(&self) -> &str {
108 "elasticache"
109 }
110
111 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
112 match request.action.as_str() {
113 "AddTagsToResource" => self.add_tags_to_resource(&request),
114 "CreateCacheCluster" => self.create_cache_cluster(&request).await,
115 "CreateGlobalReplicationGroup" => self.create_global_replication_group(&request),
116 "CreateCacheSubnetGroup" => self.create_cache_subnet_group(&request),
117 "CreateReplicationGroup" => self.create_replication_group(&request).await,
118 "CreateServerlessCache" => self.create_serverless_cache(&request).await,
119 "CreateServerlessCacheSnapshot" => self.create_serverless_cache_snapshot(&request),
120 "CreateSnapshot" => self.create_snapshot(&request),
121 "CreateUser" => self.create_user(&request),
122 "CreateUserGroup" => self.create_user_group(&request),
123 "DecreaseReplicaCount" => self.decrease_replica_count(&request),
124 "DeleteCacheCluster" => self.delete_cache_cluster(&request).await,
125 "DeleteGlobalReplicationGroup" => self.delete_global_replication_group(&request),
126 "DeleteCacheSubnetGroup" => self.delete_cache_subnet_group(&request),
127 "DeleteReplicationGroup" => self.delete_replication_group(&request).await,
128 "DeleteServerlessCache" => self.delete_serverless_cache(&request).await,
129 "DeleteServerlessCacheSnapshot" => self.delete_serverless_cache_snapshot(&request),
130 "DeleteSnapshot" => self.delete_snapshot(&request),
131 "DeleteUser" => self.delete_user(&request),
132 "DeleteUserGroup" => self.delete_user_group(&request),
133 "DescribeCacheClusters" => self.describe_cache_clusters(&request),
134 "DescribeCacheEngineVersions" => self.describe_cache_engine_versions(&request),
135 "DescribeGlobalReplicationGroups" => self.describe_global_replication_groups(&request),
136 "DescribeCacheParameterGroups" => self.describe_cache_parameter_groups(&request),
137 "DescribeReservedCacheNodes" => self.describe_reserved_cache_nodes(&request),
138 "DescribeReservedCacheNodesOfferings" => {
139 self.describe_reserved_cache_nodes_offerings(&request)
140 }
141 "DescribeCacheSubnetGroups" => self.describe_cache_subnet_groups(&request),
142 "DescribeEngineDefaultParameters" => self.describe_engine_default_parameters(&request),
143 "DescribeReplicationGroups" => self.describe_replication_groups(&request),
144 "DescribeServerlessCaches" => self.describe_serverless_caches(&request),
145 "DescribeServerlessCacheSnapshots" => {
146 self.describe_serverless_cache_snapshots(&request)
147 }
148 "DescribeSnapshots" => self.describe_snapshots(&request),
149 "DescribeUserGroups" => self.describe_user_groups(&request),
150 "DescribeUsers" => self.describe_users(&request),
151 "DisassociateGlobalReplicationGroup" => {
152 self.disassociate_global_replication_group(&request)
153 }
154 "FailoverGlobalReplicationGroup" => self.failover_global_replication_group(&request),
155 "IncreaseReplicaCount" => self.increase_replica_count(&request),
156 "ListTagsForResource" => self.list_tags_for_resource(&request),
157 "ModifyCacheSubnetGroup" => self.modify_cache_subnet_group(&request),
158 "ModifyGlobalReplicationGroup" => self.modify_global_replication_group(&request),
159 "ModifyReplicationGroup" => self.modify_replication_group(&request),
160 "ModifyServerlessCache" => self.modify_serverless_cache(&request),
161 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
162 "TestFailover" => self.test_failover(&request),
163 _ => Err(AwsServiceError::action_not_implemented(
164 self.service_name(),
165 &request.action,
166 )),
167 }
168 }
169
170 fn supported_actions(&self) -> &[&str] {
171 SUPPORTED_ACTIONS
172 }
173}
174
175impl ElastiCacheService {
176 fn describe_cache_engine_versions(
177 &self,
178 request: &AwsRequest,
179 ) -> Result<AwsResponse, AwsServiceError> {
180 let engine = optional_param(request, "Engine");
181 let engine_version = optional_param(request, "EngineVersion");
182 let family = optional_param(request, "CacheParameterGroupFamily");
183 let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
184 let max_records = optional_usize_param(request, "MaxRecords")?;
185 let marker = optional_param(request, "Marker");
186
187 let mut versions = filter_engine_versions(
188 &default_engine_versions(),
189 &engine,
190 &engine_version,
191 &family,
192 );
193
194 if default_only.unwrap_or(false) {
195 let mut seen_engines = std::collections::HashSet::new();
197 versions.retain(|v| seen_engines.insert(v.engine.clone()));
198 }
199
200 let (page, next_marker) = paginate(&versions, marker.as_deref(), max_records);
201
202 let members_xml: String = page.iter().map(engine_version_xml).collect();
203 let marker_xml = next_marker
204 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
205 .unwrap_or_default();
206
207 Ok(AwsResponse::xml(
208 StatusCode::OK,
209 xml_wrap(
210 "DescribeCacheEngineVersions",
211 &format!("<CacheEngineVersions>{members_xml}</CacheEngineVersions>{marker_xml}"),
212 &request.request_id,
213 ),
214 ))
215 }
216
217 fn describe_cache_parameter_groups(
218 &self,
219 request: &AwsRequest,
220 ) -> Result<AwsResponse, AwsServiceError> {
221 let group_name = optional_param(request, "CacheParameterGroupName");
222 let max_records = optional_usize_param(request, "MaxRecords")?;
223 let marker = optional_param(request, "Marker");
224
225 let state = self.state.read();
226
227 let groups: Vec<&CacheParameterGroup> = state
228 .parameter_groups
229 .iter()
230 .filter(|g| {
231 group_name
232 .as_ref()
233 .is_none_or(|name| g.cache_parameter_group_name == *name)
234 })
235 .collect();
236
237 if let Some(ref name) = group_name {
238 if groups.is_empty() {
239 return Err(AwsServiceError::aws_error(
240 StatusCode::NOT_FOUND,
241 "CacheParameterGroupNotFound",
242 format!("CacheParameterGroup {name} not found."),
243 ));
244 }
245 }
246
247 let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
248
249 let members_xml: String = page.iter().map(|g| cache_parameter_group_xml(g)).collect();
250 let marker_xml = next_marker
251 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
252 .unwrap_or_default();
253
254 Ok(AwsResponse::xml(
255 StatusCode::OK,
256 xml_wrap(
257 "DescribeCacheParameterGroups",
258 &format!("<CacheParameterGroups>{members_xml}</CacheParameterGroups>{marker_xml}"),
259 &request.request_id,
260 ),
261 ))
262 }
263
264 fn describe_reserved_cache_nodes(
265 &self,
266 request: &AwsRequest,
267 ) -> Result<AwsResponse, AwsServiceError> {
268 let reserved_cache_node_id = optional_param(request, "ReservedCacheNodeId");
269 let reserved_cache_nodes_offering_id =
270 optional_param(request, "ReservedCacheNodesOfferingId");
271 let cache_node_type = optional_param(request, "CacheNodeType");
272 let duration = parse_reserved_duration_filter(optional_param(request, "Duration"))?;
273 let product_description = optional_param(request, "ProductDescription");
274 let offering_type = optional_param(request, "OfferingType");
275 let max_records = optional_usize_param(request, "MaxRecords")?;
276 let marker = optional_param(request, "Marker");
277
278 let state = self.state.read();
279 let mut nodes: Vec<&ReservedCacheNode> = state.reserved_cache_nodes.values().collect();
280 nodes.retain(|node| {
281 reserved_cache_node_id
282 .as_ref()
283 .is_none_or(|expected| node.reserved_cache_node_id == *expected)
284 && reserved_cache_nodes_offering_id
285 .as_ref()
286 .is_none_or(|expected| node.reserved_cache_nodes_offering_id == *expected)
287 && cache_node_type
288 .as_ref()
289 .is_none_or(|expected| node.cache_node_type == *expected)
290 && duration.is_none_or(|expected| node.duration == expected)
291 && product_description
292 .as_ref()
293 .is_none_or(|expected| node.product_description == *expected)
294 && offering_type
295 .as_ref()
296 .is_none_or(|expected| node.offering_type == *expected)
297 });
298 nodes.sort_by(|left, right| {
299 left.reserved_cache_node_id
300 .cmp(&right.reserved_cache_node_id)
301 });
302
303 if let Some(ref id) = reserved_cache_node_id {
304 if nodes.is_empty() {
305 return Err(AwsServiceError::aws_error(
306 StatusCode::NOT_FOUND,
307 "ReservedCacheNodeNotFoundFault",
308 format!("ReservedCacheNode not found: {id}"),
309 ));
310 }
311 }
312
313 let (page, next_marker) = paginate(&nodes, marker.as_deref(), max_records);
314 let members_xml: String = page
315 .iter()
316 .map(|node| reserved_cache_node_xml(node))
317 .collect();
318 let marker_xml = next_marker
319 .map(|value| format!("<Marker>{}</Marker>", xml_escape(&value)))
320 .unwrap_or_default();
321
322 Ok(AwsResponse::xml(
323 StatusCode::OK,
324 xml_wrap(
325 "DescribeReservedCacheNodes",
326 &format!("<ReservedCacheNodes>{members_xml}</ReservedCacheNodes>{marker_xml}"),
327 &request.request_id,
328 ),
329 ))
330 }
331
332 fn describe_reserved_cache_nodes_offerings(
333 &self,
334 request: &AwsRequest,
335 ) -> Result<AwsResponse, AwsServiceError> {
336 let reserved_cache_nodes_offering_id =
337 optional_param(request, "ReservedCacheNodesOfferingId");
338 let cache_node_type = optional_param(request, "CacheNodeType");
339 let duration = parse_reserved_duration_filter(optional_param(request, "Duration"))?;
340 let product_description = optional_param(request, "ProductDescription");
341 let offering_type = optional_param(request, "OfferingType");
342 let max_records = optional_usize_param(request, "MaxRecords")?;
343 let marker = optional_param(request, "Marker");
344
345 let state = self.state.read();
346 let mut offerings: Vec<&ReservedCacheNodesOffering> =
347 state.reserved_cache_nodes_offerings.iter().collect();
348 offerings.retain(|offering| {
349 reserved_cache_nodes_offering_id
350 .as_ref()
351 .is_none_or(|expected| offering.reserved_cache_nodes_offering_id == *expected)
352 && cache_node_type
353 .as_ref()
354 .is_none_or(|expected| offering.cache_node_type == *expected)
355 && duration.is_none_or(|expected| offering.duration == expected)
356 && product_description
357 .as_ref()
358 .is_none_or(|expected| offering.product_description == *expected)
359 && offering_type
360 .as_ref()
361 .is_none_or(|expected| offering.offering_type == *expected)
362 });
363 offerings.sort_by(|left, right| {
364 left.reserved_cache_nodes_offering_id
365 .cmp(&right.reserved_cache_nodes_offering_id)
366 });
367
368 if let Some(ref id) = reserved_cache_nodes_offering_id {
369 if offerings.is_empty() {
370 return Err(AwsServiceError::aws_error(
371 StatusCode::NOT_FOUND,
372 "ReservedCacheNodesOfferingNotFoundFault",
373 format!("ReservedCacheNodesOffering not found: {id}"),
374 ));
375 }
376 }
377
378 let (page, next_marker) = paginate(&offerings, marker.as_deref(), max_records);
379 let members_xml: String = page
380 .iter()
381 .map(|offering| reserved_cache_nodes_offering_xml(offering))
382 .collect();
383 let marker_xml = next_marker
384 .map(|value| format!("<Marker>{}</Marker>", xml_escape(&value)))
385 .unwrap_or_default();
386
387 Ok(AwsResponse::xml(
388 StatusCode::OK,
389 xml_wrap(
390 "DescribeReservedCacheNodesOfferings",
391 &format!(
392 "<ReservedCacheNodesOfferings>{members_xml}</ReservedCacheNodesOfferings>{marker_xml}"
393 ),
394 &request.request_id,
395 ),
396 ))
397 }
398
399 fn describe_engine_default_parameters(
400 &self,
401 request: &AwsRequest,
402 ) -> Result<AwsResponse, AwsServiceError> {
403 let family = required_param(request, "CacheParameterGroupFamily")?;
404 let max_records = optional_usize_param(request, "MaxRecords")?;
405 let marker = optional_param(request, "Marker");
406
407 let params = default_parameters_for_family(&family);
408 let (page, next_marker) = paginate(¶ms, marker.as_deref(), max_records);
409
410 let params_xml: String = page.iter().map(parameter_xml).collect();
411 let marker_xml = next_marker
412 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
413 .unwrap_or_default();
414
415 Ok(AwsResponse::xml(
416 StatusCode::OK,
417 xml_wrap(
418 "DescribeEngineDefaultParameters",
419 &format!(
420 "<EngineDefaults>\
421 <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
422 <Parameters>{params_xml}</Parameters>\
423 {marker_xml}\
424 </EngineDefaults>",
425 xml_escape(&family),
426 ),
427 &request.request_id,
428 ),
429 ))
430 }
431
432 fn create_cache_subnet_group(
433 &self,
434 request: &AwsRequest,
435 ) -> Result<AwsResponse, AwsServiceError> {
436 let name = required_param(request, "CacheSubnetGroupName")?;
437 let description = required_param(request, "CacheSubnetGroupDescription")?;
438 let subnet_ids = parse_member_list(&request.query_params, "SubnetIds", "SubnetIdentifier");
439
440 if subnet_ids.is_empty() {
441 return Err(AwsServiceError::aws_error(
442 StatusCode::BAD_REQUEST,
443 "InvalidParameterValue",
444 "At least one subnet ID must be specified.".to_string(),
445 ));
446 }
447
448 let mut state = self.state.write();
449
450 if state.subnet_groups.contains_key(&name) {
451 return Err(AwsServiceError::aws_error(
452 StatusCode::BAD_REQUEST,
453 "CacheSubnetGroupAlreadyExists",
454 format!("Cache subnet group {name} already exists."),
455 ));
456 }
457
458 let arn = format!(
459 "arn:aws:elasticache:{}:{}:subnetgroup:{}",
460 state.region, state.account_id, name
461 );
462 let vpc_id = format!(
463 "vpc-{:08x}",
464 name.as_bytes()
465 .iter()
466 .fold(0u32, |acc, &b| acc.wrapping_add(b as u32))
467 );
468
469 let group = CacheSubnetGroup {
470 cache_subnet_group_name: name.clone(),
471 cache_subnet_group_description: description,
472 vpc_id,
473 subnet_ids,
474 arn,
475 };
476
477 let xml = cache_subnet_group_xml(&group, &state.region);
478 state.register_arn(&group.arn);
479 state.subnet_groups.insert(name, group);
480
481 Ok(AwsResponse::xml(
482 StatusCode::OK,
483 xml_wrap(
484 "CreateCacheSubnetGroup",
485 &format!("<CacheSubnetGroup>{xml}</CacheSubnetGroup>"),
486 &request.request_id,
487 ),
488 ))
489 }
490
491 fn describe_cache_subnet_groups(
492 &self,
493 request: &AwsRequest,
494 ) -> Result<AwsResponse, AwsServiceError> {
495 let group_name = optional_param(request, "CacheSubnetGroupName");
496 let max_records = optional_usize_param(request, "MaxRecords")?;
497 let marker = optional_param(request, "Marker");
498
499 let state = self.state.read();
500
501 let groups: Vec<&CacheSubnetGroup> = if let Some(ref name) = group_name {
502 match state.subnet_groups.get(name) {
503 Some(g) => vec![g],
504 None => {
505 return Err(AwsServiceError::aws_error(
506 StatusCode::NOT_FOUND,
507 "CacheSubnetGroupNotFoundFault",
508 format!("Cache subnet group {name} not found."),
509 ));
510 }
511 }
512 } else {
513 let mut groups: Vec<&CacheSubnetGroup> = state.subnet_groups.values().collect();
514 groups.sort_by(|a, b| a.cache_subnet_group_name.cmp(&b.cache_subnet_group_name));
515 groups
516 };
517
518 let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
519
520 let members_xml: String = page
521 .iter()
522 .map(|g| {
523 format!(
524 "<CacheSubnetGroup>{}</CacheSubnetGroup>",
525 cache_subnet_group_xml(g, &state.region)
526 )
527 })
528 .collect();
529 let marker_xml = next_marker
530 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
531 .unwrap_or_default();
532
533 Ok(AwsResponse::xml(
534 StatusCode::OK,
535 xml_wrap(
536 "DescribeCacheSubnetGroups",
537 &format!("<CacheSubnetGroups>{members_xml}</CacheSubnetGroups>{marker_xml}"),
538 &request.request_id,
539 ),
540 ))
541 }
542
543 fn delete_cache_subnet_group(
544 &self,
545 request: &AwsRequest,
546 ) -> Result<AwsResponse, AwsServiceError> {
547 let name = required_param(request, "CacheSubnetGroupName")?;
548
549 let mut state = self.state.write();
550
551 if name == "default" {
552 return Err(AwsServiceError::aws_error(
553 StatusCode::BAD_REQUEST,
554 "CacheSubnetGroupInUse",
555 "Cannot delete default cache subnet group.".to_string(),
556 ));
557 }
558
559 if let Some(group) = state.subnet_groups.remove(&name) {
560 state.tags.remove(&group.arn);
561 } else {
562 return Err(AwsServiceError::aws_error(
563 StatusCode::NOT_FOUND,
564 "CacheSubnetGroupNotFoundFault",
565 format!("Cache subnet group {name} not found."),
566 ));
567 }
568
569 Ok(AwsResponse::xml(
570 StatusCode::OK,
571 xml_wrap("DeleteCacheSubnetGroup", "", &request.request_id),
572 ))
573 }
574
575 fn modify_cache_subnet_group(
576 &self,
577 request: &AwsRequest,
578 ) -> Result<AwsResponse, AwsServiceError> {
579 let name = required_param(request, "CacheSubnetGroupName")?;
580 let description = optional_param(request, "CacheSubnetGroupDescription");
581 let subnet_ids = parse_member_list(&request.query_params, "SubnetIds", "SubnetIdentifier");
582
583 let mut state = self.state.write();
584 let region = state.region.clone();
585
586 let group = state.subnet_groups.get_mut(&name).ok_or_else(|| {
587 AwsServiceError::aws_error(
588 StatusCode::NOT_FOUND,
589 "CacheSubnetGroupNotFoundFault",
590 format!("Cache subnet group {name} not found."),
591 )
592 })?;
593
594 if let Some(desc) = description {
595 group.cache_subnet_group_description = desc;
596 }
597 if !subnet_ids.is_empty() {
598 group.subnet_ids = subnet_ids;
599 }
600
601 let xml = cache_subnet_group_xml(group, ®ion);
602
603 Ok(AwsResponse::xml(
604 StatusCode::OK,
605 xml_wrap(
606 "ModifyCacheSubnetGroup",
607 &format!("<CacheSubnetGroup>{xml}</CacheSubnetGroup>"),
608 &request.request_id,
609 ),
610 ))
611 }
612
613 async fn create_cache_cluster(
614 &self,
615 request: &AwsRequest,
616 ) -> Result<AwsResponse, AwsServiceError> {
617 let cache_cluster_id = required_param(request, "CacheClusterId")?;
618 let engine = optional_param(request, "Engine").unwrap_or_else(|| ENGINE_REDIS.to_string());
619 validate_engine(&engine)?;
620
621 let default_version = if engine == ENGINE_VALKEY {
622 "8.0"
623 } else {
624 "7.1"
625 };
626 let engine_version =
627 optional_param(request, "EngineVersion").unwrap_or_else(|| default_version.to_string());
628 let cache_node_type = optional_param(request, "CacheNodeType")
629 .unwrap_or_else(|| "cache.t3.micro".to_string());
630 let num_cache_nodes = match optional_param(request, "NumCacheNodes") {
631 Some(v) => {
632 let n = v.parse::<i32>().map_err(|_| {
633 AwsServiceError::aws_error(
634 StatusCode::BAD_REQUEST,
635 "InvalidParameterValue",
636 format!("Invalid value for NumCacheNodes: '{v}'"),
637 )
638 })?;
639 if n < 1 {
640 return Err(AwsServiceError::aws_error(
641 StatusCode::BAD_REQUEST,
642 "InvalidParameterValue",
643 format!("NumCacheNodes must be a positive integer, got {n}"),
644 ));
645 }
646 n
647 }
648 None => 1,
649 };
650 let cache_subnet_group_name =
651 optional_param(request, "CacheSubnetGroupName").or_else(|| Some("default".to_string()));
652 let replication_group_id = optional_param(request, "ReplicationGroupId");
653 let auto_minor_version_upgrade =
654 parse_optional_bool(optional_param(request, "AutoMinorVersionUpgrade").as_deref())?
655 .unwrap_or(true);
656
657 let (preferred_availability_zone, arn) = {
658 let mut state = self.state.write();
659 if !state.begin_cache_cluster_creation(&cache_cluster_id) {
660 return Err(AwsServiceError::aws_error(
661 StatusCode::BAD_REQUEST,
662 "CacheClusterAlreadyExists",
663 format!("CacheCluster {cache_cluster_id} already exists."),
664 ));
665 }
666
667 if let Some(ref subnet_group_name) = cache_subnet_group_name {
668 if !state.subnet_groups.contains_key(subnet_group_name) {
669 state.cancel_cache_cluster_creation(&cache_cluster_id);
670 return Err(AwsServiceError::aws_error(
671 StatusCode::NOT_FOUND,
672 "CacheSubnetGroupNotFoundFault",
673 format!("Cache subnet group {subnet_group_name} not found."),
674 ));
675 }
676 }
677
678 if let Some(ref group_id) = replication_group_id {
679 if !state.replication_groups.contains_key(group_id) {
680 state.cancel_cache_cluster_creation(&cache_cluster_id);
681 return Err(AwsServiceError::aws_error(
682 StatusCode::NOT_FOUND,
683 "ReplicationGroupNotFoundFault",
684 format!("ReplicationGroup {group_id} not found."),
685 ));
686 }
687 }
688
689 let preferred_availability_zone = optional_param(request, "PreferredAvailabilityZone")
690 .unwrap_or_else(|| format!("{}a", state.region));
691 let arn = format!(
692 "arn:aws:elasticache:{}:{}:cluster:{}",
693 state.region, state.account_id, cache_cluster_id
694 );
695 (preferred_availability_zone, arn)
696 };
697
698 let runtime = self.runtime.as_ref().ok_or_else(|| {
699 self.state
700 .write()
701 .cancel_cache_cluster_creation(&cache_cluster_id);
702 AwsServiceError::aws_error(
703 StatusCode::SERVICE_UNAVAILABLE,
704 "InvalidParameterValue",
705 "Docker/Podman is required for ElastiCache cache clusters but is not available"
706 .to_string(),
707 )
708 })?;
709
710 let running = match runtime.ensure_redis(&cache_cluster_id).await {
711 Ok(r) => r,
712 Err(e) => {
713 self.state
714 .write()
715 .cancel_cache_cluster_creation(&cache_cluster_id);
716 return Err(runtime_error_to_service_error(e));
717 }
718 };
719
720 let cluster = CacheCluster {
721 cache_cluster_id: cache_cluster_id.clone(),
722 cache_node_type,
723 engine,
724 engine_version,
725 cache_cluster_status: "available".to_string(),
726 num_cache_nodes,
727 preferred_availability_zone,
728 cache_subnet_group_name,
729 auto_minor_version_upgrade,
730 arn,
731 created_at: chrono::Utc::now().to_rfc3339(),
732 endpoint_address: "127.0.0.1".to_string(),
733 endpoint_port: running.host_port,
734 container_id: running.container_id,
735 host_port: running.host_port,
736 replication_group_id,
737 };
738
739 let xml = cache_cluster_xml(&cluster, true);
740 {
741 let mut state = self.state.write();
742 state.finish_cache_cluster_creation(cluster.clone());
743 if let Some(ref group_id) = cluster.replication_group_id {
744 add_cluster_to_replication_group(&mut state, group_id, &cluster.cache_cluster_id);
745 }
746 }
747
748 Ok(AwsResponse::xml(
749 StatusCode::OK,
750 xml_wrap(
751 "CreateCacheCluster",
752 &format!("<CacheCluster>{xml}</CacheCluster>"),
753 &request.request_id,
754 ),
755 ))
756 }
757
758 fn describe_cache_clusters(
759 &self,
760 request: &AwsRequest,
761 ) -> Result<AwsResponse, AwsServiceError> {
762 let cache_cluster_id = optional_param(request, "CacheClusterId");
763 let show_cache_node_info =
764 parse_optional_bool(optional_param(request, "ShowCacheNodeInfo").as_deref())?
765 .unwrap_or(false);
766 let max_records = optional_usize_param(request, "MaxRecords")?;
767 let marker = optional_param(request, "Marker");
768
769 let state = self.state.read();
770 let clusters: Vec<&CacheCluster> = if let Some(ref cluster_id) = cache_cluster_id {
771 match state.cache_clusters.get(cluster_id) {
772 Some(cluster) => vec![cluster],
773 None => {
774 return Err(AwsServiceError::aws_error(
775 StatusCode::NOT_FOUND,
776 "CacheClusterNotFound",
777 format!("CacheCluster {cluster_id} not found."),
778 ));
779 }
780 }
781 } else {
782 let mut clusters: Vec<&CacheCluster> = state.cache_clusters.values().collect();
783 clusters.sort_by(|a, b| a.cache_cluster_id.cmp(&b.cache_cluster_id));
784 clusters
785 };
786
787 let (page, next_marker) = paginate(&clusters, marker.as_deref(), max_records);
788 let members_xml: String = page
789 .iter()
790 .map(|cluster| {
791 format!(
792 "<CacheCluster>{}</CacheCluster>",
793 cache_cluster_xml(cluster, show_cache_node_info)
794 )
795 })
796 .collect();
797 let marker_xml = next_marker
798 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
799 .unwrap_or_default();
800
801 Ok(AwsResponse::xml(
802 StatusCode::OK,
803 xml_wrap(
804 "DescribeCacheClusters",
805 &format!("<CacheClusters>{members_xml}</CacheClusters>{marker_xml}"),
806 &request.request_id,
807 ),
808 ))
809 }
810
811 async fn delete_cache_cluster(
812 &self,
813 request: &AwsRequest,
814 ) -> Result<AwsResponse, AwsServiceError> {
815 let cache_cluster_id = required_param(request, "CacheClusterId")?;
816
817 let cluster = {
818 let mut state = self.state.write();
819 let cluster = state
820 .cache_clusters
821 .remove(&cache_cluster_id)
822 .ok_or_else(|| {
823 AwsServiceError::aws_error(
824 StatusCode::NOT_FOUND,
825 "CacheClusterNotFound",
826 format!("CacheCluster {cache_cluster_id} not found."),
827 )
828 })?;
829 if let Some(ref group_id) = cluster.replication_group_id {
830 remove_cluster_from_replication_group(
831 &mut state,
832 group_id,
833 &cluster.cache_cluster_id,
834 );
835 }
836 state.tags.remove(&cluster.arn);
837 cluster
838 };
839
840 if let Some(ref runtime) = self.runtime {
841 runtime.stop_container(&cache_cluster_id).await;
842 }
843
844 let mut deleted_cluster = cluster;
845 deleted_cluster.cache_cluster_status = "deleting".to_string();
846 let xml = cache_cluster_xml(&deleted_cluster, true);
847
848 Ok(AwsResponse::xml(
849 StatusCode::OK,
850 xml_wrap(
851 "DeleteCacheCluster",
852 &format!("<CacheCluster>{xml}</CacheCluster>"),
853 &request.request_id,
854 ),
855 ))
856 }
857
858 async fn create_replication_group(
859 &self,
860 request: &AwsRequest,
861 ) -> Result<AwsResponse, AwsServiceError> {
862 let replication_group_id = required_param(request, "ReplicationGroupId")?;
863 let description = required_param(request, "ReplicationGroupDescription")?;
864 let engine = optional_param(request, "Engine").unwrap_or_else(|| ENGINE_REDIS.to_string());
865 validate_engine(&engine)?;
866 let default_version = if engine == ENGINE_VALKEY {
867 "8.0"
868 } else {
869 "7.1"
870 };
871 let engine_version =
872 optional_param(request, "EngineVersion").unwrap_or_else(|| default_version.to_string());
873 let cache_node_type = optional_param(request, "CacheNodeType")
874 .unwrap_or_else(|| "cache.t3.micro".to_string());
875 let num_cache_clusters = match optional_param(request, "NumCacheClusters") {
876 Some(v) => {
877 let n = v.parse::<i32>().map_err(|_| {
878 AwsServiceError::aws_error(
879 StatusCode::BAD_REQUEST,
880 "InvalidParameterValue",
881 format!("Invalid value for NumCacheClusters: '{v}'"),
882 )
883 })?;
884 if n < 1 {
885 return Err(AwsServiceError::aws_error(
886 StatusCode::BAD_REQUEST,
887 "InvalidParameterValue",
888 format!("NumCacheClusters must be a positive integer, got {n}"),
889 ));
890 }
891 n
892 }
893 None => 1,
894 };
895 let automatic_failover =
896 parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?
897 .unwrap_or(false);
898 {
900 let mut state = self.state.write();
901 if !state.begin_replication_group_creation(&replication_group_id) {
902 return Err(AwsServiceError::aws_error(
903 StatusCode::BAD_REQUEST,
904 "ReplicationGroupAlreadyExistsFault",
905 format!("ReplicationGroup {replication_group_id} already exists."),
906 ));
907 }
908 }
909
910 let runtime = self.runtime.as_ref().ok_or_else(|| {
911 self.state
912 .write()
913 .cancel_replication_group_creation(&replication_group_id);
914 AwsServiceError::aws_error(
915 StatusCode::SERVICE_UNAVAILABLE,
916 "InvalidParameterValue",
917 "Docker/Podman is required for ElastiCache replication groups but is not available"
918 .to_string(),
919 )
920 })?;
921
922 let running = match runtime.ensure_redis(&replication_group_id).await {
923 Ok(r) => r,
924 Err(e) => {
925 self.state
926 .write()
927 .cancel_replication_group_creation(&replication_group_id);
928 return Err(runtime_error_to_service_error(e));
929 }
930 };
931
932 let member_clusters: Vec<String> = (1..=num_cache_clusters)
933 .map(|i| format!("{replication_group_id}-{i:03}"))
934 .collect();
935
936 let (arn, region) = {
937 let state = self.state.read();
938 let arn = format!(
939 "arn:aws:elasticache:{}:{}:replicationgroup:{}",
940 state.region, state.account_id, replication_group_id
941 );
942 (arn, state.region.clone())
943 };
944
945 let group = ReplicationGroup {
946 replication_group_id: replication_group_id.clone(),
947 description,
948 global_replication_group_id: None,
949 global_replication_group_role: None,
950 status: "available".to_string(),
951 cache_node_type,
952 engine,
953 engine_version,
954 num_cache_clusters,
955 automatic_failover_enabled: automatic_failover,
956 endpoint_address: "127.0.0.1".to_string(),
957 endpoint_port: running.host_port,
958 arn,
959 created_at: chrono::Utc::now().to_rfc3339(),
960 container_id: running.container_id,
961 host_port: running.host_port,
962 member_clusters,
963 snapshot_retention_limit: 0,
964 snapshot_window: "05:00-09:00".to_string(),
965 };
966
967 let xml = replication_group_xml(&group, ®ion);
968 self.state.write().finish_replication_group_creation(group);
969
970 Ok(AwsResponse::xml(
971 StatusCode::OK,
972 xml_wrap(
973 "CreateReplicationGroup",
974 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
975 &request.request_id,
976 ),
977 ))
978 }
979
980 fn create_global_replication_group(
981 &self,
982 request: &AwsRequest,
983 ) -> Result<AwsResponse, AwsServiceError> {
984 let suffix = required_param(request, "GlobalReplicationGroupIdSuffix")?;
985 let primary_replication_group_id = required_param(request, "PrimaryReplicationGroupId")?;
986 let description =
987 optional_param(request, "GlobalReplicationGroupDescription").unwrap_or_default();
988
989 let mut state = self.state.write();
990 let region = state.region.clone();
991 let account_id = state.account_id.clone();
992 let global_replication_group_id = global_replication_group_id(®ion, suffix.as_str());
993 if state
994 .global_replication_groups
995 .contains_key(&global_replication_group_id)
996 {
997 return Err(AwsServiceError::aws_error(
998 StatusCode::BAD_REQUEST,
999 "GlobalReplicationGroupAlreadyExistsFault",
1000 format!("GlobalReplicationGroup {global_replication_group_id} already exists."),
1001 ));
1002 }
1003
1004 let primary_group = state
1005 .replication_groups
1006 .get_mut(&primary_replication_group_id)
1007 .ok_or_else(|| {
1008 AwsServiceError::aws_error(
1009 StatusCode::NOT_FOUND,
1010 "ReplicationGroupNotFoundFault",
1011 format!("ReplicationGroup {primary_replication_group_id} not found."),
1012 )
1013 })?;
1014
1015 if primary_group.global_replication_group_id.is_some() {
1016 return Err(AwsServiceError::aws_error(
1017 StatusCode::BAD_REQUEST,
1018 "InvalidReplicationGroupStateFault",
1019 format!(
1020 "ReplicationGroup {primary_replication_group_id} is already associated with a GlobalReplicationGroup."
1021 ),
1022 ));
1023 }
1024
1025 primary_group.global_replication_group_id = Some(global_replication_group_id.clone());
1026 primary_group.global_replication_group_role = Some("primary".to_string());
1027
1028 let group = GlobalReplicationGroup {
1029 global_replication_group_id: global_replication_group_id.clone(),
1030 global_replication_group_description: description,
1031 status: "available".to_string(),
1032 cache_node_type: primary_group.cache_node_type.clone(),
1033 engine: primary_group.engine.clone(),
1034 engine_version: primary_group.engine_version.clone(),
1035 members: vec![GlobalReplicationGroupMember {
1036 replication_group_id: primary_group.replication_group_id.clone(),
1037 replication_group_region: region.clone(),
1038 role: "primary".to_string(),
1039 automatic_failover: primary_group.automatic_failover_enabled,
1040 status: "associated".to_string(),
1041 }],
1042 cluster_enabled: false,
1043 arn: format!(
1044 "arn:aws:elasticache:{}:{}:globalreplicationgroup:{}",
1045 region, account_id, global_replication_group_id
1046 ),
1047 };
1048
1049 let xml = global_replication_group_xml(&group, true);
1050 state
1051 .global_replication_groups
1052 .insert(global_replication_group_id, group);
1053
1054 Ok(AwsResponse::xml(
1055 StatusCode::OK,
1056 xml_wrap(
1057 "CreateGlobalReplicationGroup",
1058 &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
1059 &request.request_id,
1060 ),
1061 ))
1062 }
1063
1064 fn describe_global_replication_groups(
1065 &self,
1066 request: &AwsRequest,
1067 ) -> Result<AwsResponse, AwsServiceError> {
1068 let global_replication_group_id = optional_param(request, "GlobalReplicationGroupId");
1069 let max_records = optional_usize_param(request, "MaxRecords")?;
1070 let marker = optional_param(request, "Marker");
1071 let show_member_info =
1072 parse_optional_bool(optional_param(request, "ShowMemberInfo").as_deref())?
1073 .unwrap_or(false);
1074
1075 let state = self.state.read();
1076 let groups: Vec<&GlobalReplicationGroup> = if let Some(ref global_replication_group_id) =
1077 global_replication_group_id
1078 {
1079 match state
1080 .global_replication_groups
1081 .get(global_replication_group_id)
1082 {
1083 Some(group) => vec![group],
1084 None => {
1085 return Err(AwsServiceError::aws_error(
1086 StatusCode::NOT_FOUND,
1087 "GlobalReplicationGroupNotFoundFault",
1088 format!("GlobalReplicationGroup {global_replication_group_id} not found."),
1089 ));
1090 }
1091 }
1092 } else {
1093 let mut groups: Vec<&GlobalReplicationGroup> =
1094 state.global_replication_groups.values().collect();
1095 groups.sort_by(|a, b| {
1096 a.global_replication_group_id
1097 .cmp(&b.global_replication_group_id)
1098 });
1099 groups
1100 };
1101
1102 let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
1103 let groups_xml: String = page
1104 .iter()
1105 .map(|group| {
1106 format!(
1107 "<GlobalReplicationGroup>{}</GlobalReplicationGroup>",
1108 global_replication_group_xml(group, show_member_info)
1109 )
1110 })
1111 .collect();
1112 let marker_xml = next_marker
1113 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1114 .unwrap_or_default();
1115
1116 Ok(AwsResponse::xml(
1117 StatusCode::OK,
1118 xml_wrap(
1119 "DescribeGlobalReplicationGroups",
1120 &format!(
1121 "<GlobalReplicationGroups>{groups_xml}</GlobalReplicationGroups>{marker_xml}"
1122 ),
1123 &request.request_id,
1124 ),
1125 ))
1126 }
1127
1128 fn describe_replication_groups(
1129 &self,
1130 request: &AwsRequest,
1131 ) -> Result<AwsResponse, AwsServiceError> {
1132 let group_id = optional_param(request, "ReplicationGroupId");
1133 let max_records = optional_usize_param(request, "MaxRecords")?;
1134 let marker = optional_param(request, "Marker");
1135
1136 let state = self.state.read();
1137 let region = state.region.clone();
1138
1139 let groups: Vec<&ReplicationGroup> = if let Some(ref id) = group_id {
1140 match state.replication_groups.get(id) {
1141 Some(g) => vec![g],
1142 None => {
1143 return Err(AwsServiceError::aws_error(
1144 StatusCode::NOT_FOUND,
1145 "ReplicationGroupNotFoundFault",
1146 format!("ReplicationGroup {id} not found."),
1147 ));
1148 }
1149 }
1150 } else {
1151 let mut groups: Vec<&ReplicationGroup> = state.replication_groups.values().collect();
1152 groups.sort_by(|a, b| a.replication_group_id.cmp(&b.replication_group_id));
1153 groups
1154 };
1155
1156 let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
1157
1158 let members_xml: String = page
1159 .iter()
1160 .map(|g| {
1161 format!(
1162 "<ReplicationGroup>{}</ReplicationGroup>",
1163 replication_group_xml(g, ®ion)
1164 )
1165 })
1166 .collect();
1167 let marker_xml = next_marker
1168 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1169 .unwrap_or_default();
1170
1171 Ok(AwsResponse::xml(
1172 StatusCode::OK,
1173 xml_wrap(
1174 "DescribeReplicationGroups",
1175 &format!("<ReplicationGroups>{members_xml}</ReplicationGroups>{marker_xml}"),
1176 &request.request_id,
1177 ),
1178 ))
1179 }
1180
1181 fn delete_global_replication_group(
1182 &self,
1183 request: &AwsRequest,
1184 ) -> Result<AwsResponse, AwsServiceError> {
1185 let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
1186 let retain_primary = parse_required_bool(request, "RetainPrimaryReplicationGroup")?;
1187
1188 let mut state = self.state.write();
1189 let mut group = state
1190 .global_replication_groups
1191 .remove(&global_replication_group_id)
1192 .ok_or_else(|| {
1193 AwsServiceError::aws_error(
1194 StatusCode::NOT_FOUND,
1195 "GlobalReplicationGroupNotFoundFault",
1196 format!("GlobalReplicationGroup {global_replication_group_id} not found."),
1197 )
1198 })?;
1199
1200 for member in &group.members {
1201 if !retain_primary && member.role == "primary" {
1202 if let Some(rg) = state
1204 .replication_groups
1205 .remove(&member.replication_group_id)
1206 {
1207 state.tags.remove(&rg.arn);
1208 }
1209 } else if let Some(replication_group) = state
1210 .replication_groups
1211 .get_mut(&member.replication_group_id)
1212 {
1213 replication_group.global_replication_group_id = None;
1214 replication_group.global_replication_group_role = None;
1215 }
1216 }
1217
1218 group.status = "deleting".to_string();
1219 let xml = global_replication_group_xml(&group, true);
1220
1221 Ok(AwsResponse::xml(
1222 StatusCode::OK,
1223 xml_wrap(
1224 "DeleteGlobalReplicationGroup",
1225 &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
1226 &request.request_id,
1227 ),
1228 ))
1229 }
1230
1231 async fn delete_replication_group(
1232 &self,
1233 request: &AwsRequest,
1234 ) -> Result<AwsResponse, AwsServiceError> {
1235 let replication_group_id = required_param(request, "ReplicationGroupId")?;
1236
1237 let group = {
1238 let mut state = self.state.write();
1239 let g = state
1240 .replication_groups
1241 .remove(&replication_group_id)
1242 .ok_or_else(|| {
1243 AwsServiceError::aws_error(
1244 StatusCode::NOT_FOUND,
1245 "ReplicationGroupNotFoundFault",
1246 format!("ReplicationGroup {replication_group_id} not found."),
1247 )
1248 })?;
1249 state.tags.remove(&g.arn);
1250 g
1251 };
1252
1253 if let Some(ref runtime) = self.runtime {
1254 runtime.stop_container(&replication_group_id).await;
1255 }
1256
1257 let region = self.state.read().region.clone();
1258 let mut deleted_group = group;
1259 deleted_group.status = "deleting".to_string();
1260 let xml = replication_group_xml(&deleted_group, ®ion);
1261
1262 Ok(AwsResponse::xml(
1263 StatusCode::OK,
1264 xml_wrap(
1265 "DeleteReplicationGroup",
1266 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
1267 &request.request_id,
1268 ),
1269 ))
1270 }
1271
1272 async fn create_serverless_cache(
1273 &self,
1274 request: &AwsRequest,
1275 ) -> Result<AwsResponse, AwsServiceError> {
1276 let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1277 let engine = required_param(request, "Engine")?;
1278 validate_serverless_engine(&engine)?;
1279
1280 let description = optional_param(request, "Description").unwrap_or_default();
1281 let major_engine_version = optional_param(request, "MajorEngineVersion")
1282 .unwrap_or_else(|| default_major_engine_version(&engine).to_string());
1283 let full_engine_version = default_full_engine_version(&engine, &major_engine_version)?;
1284 let cache_usage_limits = parse_cache_usage_limits(request)?;
1285 let security_group_ids =
1286 parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
1287 let subnet_ids = parse_query_list_param(request, "SubnetIds", "SubnetId");
1288 let kms_key_id = optional_param(request, "KmsKeyId");
1289 let user_group_id = optional_param(request, "UserGroupId");
1290 let snapshot_retention_limit =
1291 optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
1292 let daily_snapshot_time = optional_param(request, "DailySnapshotTime");
1293 let tags = parse_tags(request)?;
1294
1295 let (arn, endpoint_address) = {
1296 let mut state = self.state.write();
1297 if !state.begin_serverless_cache_creation(&serverless_cache_name) {
1298 return Err(AwsServiceError::aws_error(
1299 StatusCode::BAD_REQUEST,
1300 "ServerlessCacheAlreadyExistsFault",
1301 format!("ServerlessCache {serverless_cache_name} already exists."),
1302 ));
1303 }
1304
1305 if let Some(ref group_id) = user_group_id {
1306 let user_group_status = match state.user_groups.get(group_id) {
1307 Some(user_group) => user_group.status.clone(),
1308 None => {
1309 state.cancel_serverless_cache_creation(&serverless_cache_name);
1310 return Err(AwsServiceError::aws_error(
1311 StatusCode::NOT_FOUND,
1312 "UserGroupNotFound",
1313 format!("User group {group_id} not found."),
1314 ));
1315 }
1316 };
1317 if user_group_status != "active" {
1318 state.cancel_serverless_cache_creation(&serverless_cache_name);
1319 return Err(AwsServiceError::aws_error(
1320 StatusCode::BAD_REQUEST,
1321 "InvalidUserGroupState",
1322 format!("User group {group_id} is not in active state."),
1323 ));
1324 }
1325 }
1326
1327 let arn = format!(
1328 "arn:aws:elasticache:{}:{}:serverlesscache:{}",
1329 state.region, state.account_id, serverless_cache_name
1330 );
1331 (arn, "127.0.0.1".to_string())
1332 };
1333
1334 let runtime = self.runtime.as_ref().ok_or_else(|| {
1335 self.state
1336 .write()
1337 .cancel_serverless_cache_creation(&serverless_cache_name);
1338 AwsServiceError::aws_error(
1339 StatusCode::SERVICE_UNAVAILABLE,
1340 "InvalidParameterValue",
1341 "Docker/Podman is required for ElastiCache serverless caches but is not available"
1342 .to_string(),
1343 )
1344 })?;
1345
1346 let running = match runtime.ensure_redis(&serverless_cache_name).await {
1347 Ok(r) => r,
1348 Err(e) => {
1349 self.state
1350 .write()
1351 .cancel_serverless_cache_creation(&serverless_cache_name);
1352 return Err(runtime_error_to_service_error(e));
1353 }
1354 };
1355
1356 let endpoint = ServerlessCacheEndpoint {
1357 address: endpoint_address.clone(),
1358 port: running.host_port,
1359 };
1360 let reader_endpoint = ServerlessCacheEndpoint {
1361 address: endpoint_address,
1362 port: running.host_port,
1363 };
1364 let cache = ServerlessCache {
1365 serverless_cache_name: serverless_cache_name.clone(),
1366 description,
1367 engine,
1368 major_engine_version,
1369 full_engine_version,
1370 status: "available".to_string(),
1371 endpoint,
1372 reader_endpoint,
1373 arn: arn.clone(),
1374 created_at: chrono::Utc::now().to_rfc3339(),
1375 cache_usage_limits,
1376 security_group_ids,
1377 subnet_ids,
1378 kms_key_id,
1379 user_group_id,
1380 snapshot_retention_limit,
1381 daily_snapshot_time,
1382 container_id: running.container_id,
1383 host_port: running.host_port,
1384 };
1385
1386 let xml = serverless_cache_xml(&cache);
1387 {
1388 let mut state = self.state.write();
1389 state.finish_serverless_cache_creation(cache.clone());
1390 if !tags.is_empty() {
1391 merge_tags(state.tags.entry(arn).or_default(), &tags);
1392 }
1393 }
1394
1395 Ok(AwsResponse::xml(
1396 StatusCode::OK,
1397 xml_wrap(
1398 "CreateServerlessCache",
1399 &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1400 &request.request_id,
1401 ),
1402 ))
1403 }
1404
1405 fn describe_serverless_caches(
1406 &self,
1407 request: &AwsRequest,
1408 ) -> Result<AwsResponse, AwsServiceError> {
1409 let serverless_cache_name = optional_param(request, "ServerlessCacheName");
1410 let max_results = optional_usize_param(request, "MaxResults")?;
1411 let next_token = optional_param(request, "NextToken");
1412
1413 let state = self.state.read();
1414 let caches: Vec<&ServerlessCache> = if let Some(ref name) = serverless_cache_name {
1415 match state.serverless_caches.get(name) {
1416 Some(cache) => vec![cache],
1417 None => {
1418 return Err(AwsServiceError::aws_error(
1419 StatusCode::NOT_FOUND,
1420 "ServerlessCacheNotFoundFault",
1421 format!("ServerlessCache {name} not found."),
1422 ));
1423 }
1424 }
1425 } else {
1426 let mut caches: Vec<&ServerlessCache> = state.serverless_caches.values().collect();
1427 caches.sort_by(|a, b| a.serverless_cache_name.cmp(&b.serverless_cache_name));
1428 caches
1429 };
1430
1431 let (page, next_token) = paginate(&caches, next_token.as_deref(), max_results);
1432 let members_xml: String = page
1433 .iter()
1434 .map(|cache| format!("<member>{}</member>", serverless_cache_xml(cache)))
1435 .collect();
1436 let next_token_xml = next_token
1437 .map(|token| format!("<NextToken>{}</NextToken>", xml_escape(&token)))
1438 .unwrap_or_default();
1439
1440 Ok(AwsResponse::xml(
1441 StatusCode::OK,
1442 xml_wrap(
1443 "DescribeServerlessCaches",
1444 &format!("<ServerlessCaches>{members_xml}</ServerlessCaches>{next_token_xml}"),
1445 &request.request_id,
1446 ),
1447 ))
1448 }
1449
1450 async fn delete_serverless_cache(
1451 &self,
1452 request: &AwsRequest,
1453 ) -> Result<AwsResponse, AwsServiceError> {
1454 let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1455
1456 let cache = {
1457 let mut state = self.state.write();
1458 let cache = state
1459 .serverless_caches
1460 .remove(&serverless_cache_name)
1461 .ok_or_else(|| {
1462 AwsServiceError::aws_error(
1463 StatusCode::NOT_FOUND,
1464 "ServerlessCacheNotFoundFault",
1465 format!("ServerlessCache {serverless_cache_name} not found."),
1466 )
1467 })?;
1468 state.tags.remove(&cache.arn);
1469 cache
1470 };
1471
1472 if let Some(ref runtime) = self.runtime {
1473 runtime.stop_container(&serverless_cache_name).await;
1474 }
1475
1476 let mut deleted = cache;
1477 deleted.status = "deleting".to_string();
1478 let xml = serverless_cache_xml(&deleted);
1479
1480 Ok(AwsResponse::xml(
1481 StatusCode::OK,
1482 xml_wrap(
1483 "DeleteServerlessCache",
1484 &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1485 &request.request_id,
1486 ),
1487 ))
1488 }
1489
1490 fn modify_serverless_cache(
1491 &self,
1492 request: &AwsRequest,
1493 ) -> Result<AwsResponse, AwsServiceError> {
1494 let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1495 let description = optional_param(request, "Description");
1496 let cache_usage_limits = parse_cache_usage_limits(request)?;
1497 let security_group_ids =
1498 parse_query_list_param(request, "SecurityGroupIds", "SecurityGroupId");
1499 let user_group_id = optional_param(request, "UserGroupId");
1500 let snapshot_retention_limit =
1501 optional_non_negative_i32_param(request, "SnapshotRetentionLimit")?;
1502 let daily_snapshot_time = optional_param(request, "DailySnapshotTime");
1503
1504 let mut state = self.state.write();
1505
1506 if let Some(ref group_id) = user_group_id {
1507 let user_group = state.user_groups.get(group_id).ok_or_else(|| {
1508 AwsServiceError::aws_error(
1509 StatusCode::NOT_FOUND,
1510 "UserGroupNotFound",
1511 format!("User group {group_id} not found."),
1512 )
1513 })?;
1514 if user_group.status != "active" {
1515 return Err(AwsServiceError::aws_error(
1516 StatusCode::BAD_REQUEST,
1517 "InvalidUserGroupState",
1518 format!("User group {group_id} is not in active state."),
1519 ));
1520 }
1521 }
1522
1523 let cache = state
1524 .serverless_caches
1525 .get_mut(&serverless_cache_name)
1526 .ok_or_else(|| {
1527 AwsServiceError::aws_error(
1528 StatusCode::NOT_FOUND,
1529 "ServerlessCacheNotFoundFault",
1530 format!("ServerlessCache {serverless_cache_name} not found."),
1531 )
1532 })?;
1533
1534 if let Some(description) = description {
1535 cache.description = description;
1536 }
1537 if cache_usage_limits.is_some() {
1538 cache.cache_usage_limits = cache_usage_limits;
1539 }
1540 if !security_group_ids.is_empty() {
1541 cache.security_group_ids = security_group_ids;
1542 }
1543 if let Some(user_group_id) = user_group_id {
1544 cache.user_group_id = Some(user_group_id);
1545 }
1546 if let Some(snapshot_retention_limit) = snapshot_retention_limit {
1547 cache.snapshot_retention_limit = Some(snapshot_retention_limit);
1548 }
1549 if let Some(daily_snapshot_time) = daily_snapshot_time {
1550 cache.daily_snapshot_time = Some(daily_snapshot_time);
1551 }
1552
1553 let xml = serverless_cache_xml(cache);
1554 Ok(AwsResponse::xml(
1555 StatusCode::OK,
1556 xml_wrap(
1557 "ModifyServerlessCache",
1558 &format!("<ServerlessCache>{xml}</ServerlessCache>"),
1559 &request.request_id,
1560 ),
1561 ))
1562 }
1563
1564 fn create_serverless_cache_snapshot(
1565 &self,
1566 request: &AwsRequest,
1567 ) -> Result<AwsResponse, AwsServiceError> {
1568 let serverless_cache_name = required_param(request, "ServerlessCacheName")?;
1569 let serverless_cache_snapshot_name =
1570 required_param(request, "ServerlessCacheSnapshotName")?;
1571 let kms_key_id = optional_param(request, "KmsKeyId");
1572 let tags = parse_tags(request)?;
1573
1574 let mut state = self.state.write();
1575 if state
1576 .serverless_cache_snapshots
1577 .contains_key(&serverless_cache_snapshot_name)
1578 {
1579 return Err(AwsServiceError::aws_error(
1580 StatusCode::BAD_REQUEST,
1581 "ServerlessCacheSnapshotAlreadyExistsFault",
1582 format!("ServerlessCacheSnapshot {serverless_cache_snapshot_name} already exists."),
1583 ));
1584 }
1585
1586 let cache = state
1587 .serverless_caches
1588 .get(&serverless_cache_name)
1589 .ok_or_else(|| {
1590 AwsServiceError::aws_error(
1591 StatusCode::NOT_FOUND,
1592 "ServerlessCacheNotFoundFault",
1593 format!("ServerlessCache {serverless_cache_name} not found."),
1594 )
1595 })?;
1596
1597 let arn = format!(
1598 "arn:aws:elasticache:{}:{}:serverlesssnapshot:{}",
1599 state.region, state.account_id, serverless_cache_snapshot_name
1600 );
1601 let snapshot = ServerlessCacheSnapshot {
1602 serverless_cache_snapshot_name: serverless_cache_snapshot_name.clone(),
1603 arn: arn.clone(),
1604 kms_key_id: kms_key_id.or_else(|| cache.kms_key_id.clone()),
1605 snapshot_type: "manual".to_string(),
1606 status: "available".to_string(),
1607 create_time: chrono::Utc::now().to_rfc3339(),
1608 expiry_time: None,
1609 bytes_used_for_cache: None,
1610 serverless_cache_name: cache.serverless_cache_name.clone(),
1611 engine: cache.engine.clone(),
1612 major_engine_version: cache.major_engine_version.clone(),
1613 };
1614
1615 let xml = serverless_cache_snapshot_xml(&snapshot);
1616 state.tags.insert(arn.clone(), Vec::new());
1617 if !tags.is_empty() {
1618 merge_tags(state.tags.entry(arn).or_default(), &tags);
1619 }
1620 state
1621 .serverless_cache_snapshots
1622 .insert(serverless_cache_snapshot_name, snapshot);
1623
1624 Ok(AwsResponse::xml(
1625 StatusCode::OK,
1626 xml_wrap(
1627 "CreateServerlessCacheSnapshot",
1628 &format!("<ServerlessCacheSnapshot>{xml}</ServerlessCacheSnapshot>"),
1629 &request.request_id,
1630 ),
1631 ))
1632 }
1633
1634 fn describe_serverless_cache_snapshots(
1635 &self,
1636 request: &AwsRequest,
1637 ) -> Result<AwsResponse, AwsServiceError> {
1638 let serverless_cache_name = optional_param(request, "ServerlessCacheName");
1639 let serverless_cache_snapshot_name = optional_param(request, "ServerlessCacheSnapshotName");
1640 let snapshot_type = optional_param(request, "SnapshotType");
1641 let max_results = optional_usize_param(request, "MaxResults")?;
1642 let next_token = optional_param(request, "NextToken");
1643
1644 let state = self.state.read();
1645 let snapshots: Vec<&ServerlessCacheSnapshot> =
1646 if let Some(ref snapshot_name) = serverless_cache_snapshot_name {
1647 match state.serverless_cache_snapshots.get(snapshot_name) {
1648 Some(snapshot) => vec![snapshot],
1649 None => {
1650 return Err(AwsServiceError::aws_error(
1651 StatusCode::NOT_FOUND,
1652 "ServerlessCacheSnapshotNotFoundFault",
1653 format!("ServerlessCacheSnapshot {snapshot_name} not found."),
1654 ));
1655 }
1656 }
1657 } else {
1658 if let Some(ref cache_name) = serverless_cache_name {
1659 if !state.serverless_caches.contains_key(cache_name) {
1660 return Err(AwsServiceError::aws_error(
1661 StatusCode::NOT_FOUND,
1662 "ServerlessCacheNotFoundFault",
1663 format!("ServerlessCache {cache_name} not found."),
1664 ));
1665 }
1666 }
1667
1668 let mut snapshots: Vec<&ServerlessCacheSnapshot> = state
1669 .serverless_cache_snapshots
1670 .values()
1671 .filter(|snapshot| {
1672 serverless_cache_name
1673 .as_ref()
1674 .is_none_or(|name| snapshot.serverless_cache_name == *name)
1675 })
1676 .filter(|snapshot| {
1677 snapshot_type
1678 .as_ref()
1679 .is_none_or(|value| snapshot.snapshot_type == *value)
1680 })
1681 .collect();
1682 snapshots.sort_by(|a, b| {
1683 a.serverless_cache_snapshot_name
1684 .cmp(&b.serverless_cache_snapshot_name)
1685 });
1686 snapshots
1687 };
1688
1689 let (page, next_token) = paginate(&snapshots, next_token.as_deref(), max_results);
1690 let members_xml: String = page
1691 .iter()
1692 .map(|snapshot| {
1693 format!(
1694 "<ServerlessCacheSnapshot>{}</ServerlessCacheSnapshot>",
1695 serverless_cache_snapshot_xml(snapshot)
1696 )
1697 })
1698 .collect();
1699 let next_token_xml = next_token
1700 .map(|token| format!("<NextToken>{}</NextToken>", xml_escape(&token)))
1701 .unwrap_or_default();
1702
1703 Ok(AwsResponse::xml(
1704 StatusCode::OK,
1705 xml_wrap(
1706 "DescribeServerlessCacheSnapshots",
1707 &format!(
1708 "<ServerlessCacheSnapshots>{members_xml}</ServerlessCacheSnapshots>{next_token_xml}"
1709 ),
1710 &request.request_id,
1711 ),
1712 ))
1713 }
1714
1715 fn delete_serverless_cache_snapshot(
1716 &self,
1717 request: &AwsRequest,
1718 ) -> Result<AwsResponse, AwsServiceError> {
1719 let serverless_cache_snapshot_name =
1720 required_param(request, "ServerlessCacheSnapshotName")?;
1721
1722 let mut state = self.state.write();
1723 let mut snapshot = state
1724 .serverless_cache_snapshots
1725 .remove(&serverless_cache_snapshot_name)
1726 .ok_or_else(|| {
1727 AwsServiceError::aws_error(
1728 StatusCode::NOT_FOUND,
1729 "ServerlessCacheSnapshotNotFoundFault",
1730 format!("ServerlessCacheSnapshot {serverless_cache_snapshot_name} not found."),
1731 )
1732 })?;
1733 state.tags.remove(&snapshot.arn);
1734
1735 snapshot.status = "deleting".to_string();
1736 let xml = serverless_cache_snapshot_xml(&snapshot);
1737
1738 Ok(AwsResponse::xml(
1739 StatusCode::OK,
1740 xml_wrap(
1741 "DeleteServerlessCacheSnapshot",
1742 &format!("<ServerlessCacheSnapshot>{xml}</ServerlessCacheSnapshot>"),
1743 &request.request_id,
1744 ),
1745 ))
1746 }
1747
1748 fn create_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749 let snapshot_name = required_param(request, "SnapshotName")?;
1750 let replication_group_id = optional_param(request, "ReplicationGroupId");
1751 let cache_cluster_id = optional_param(request, "CacheClusterId");
1752
1753 if replication_group_id.is_none() && cache_cluster_id.is_none() {
1754 return Err(AwsServiceError::aws_error(
1755 StatusCode::BAD_REQUEST,
1756 "InvalidParameterCombination",
1757 "At least one of ReplicationGroupId or CacheClusterId must be specified."
1758 .to_string(),
1759 ));
1760 }
1761
1762 let mut state = self.state.write();
1763
1764 if state.snapshots.contains_key(&snapshot_name) {
1765 return Err(AwsServiceError::aws_error(
1766 StatusCode::BAD_REQUEST,
1767 "SnapshotAlreadyExistsFault",
1768 format!("Snapshot {snapshot_name} already exists."),
1769 ));
1770 }
1771
1772 let group_id = if let Some(ref rg_id) = replication_group_id {
1774 rg_id.clone()
1775 } else {
1776 let cluster_id = cache_cluster_id.as_ref().unwrap();
1777 if let Some(cluster) = state.cache_clusters.get(cluster_id) {
1778 if let Some(group_id) = cluster.replication_group_id.clone() {
1779 group_id
1780 } else {
1781 return Err(AwsServiceError::aws_error(
1782 StatusCode::BAD_REQUEST,
1783 "InvalidParameterCombination",
1784 format!(
1785 "CacheCluster {cluster_id} is not associated with a replication group."
1786 ),
1787 ));
1788 }
1789 } else {
1790 state
1792 .replication_groups
1793 .values()
1794 .find(|g| g.member_clusters.contains(cluster_id))
1795 .map(|g| g.replication_group_id.clone())
1796 .ok_or_else(|| {
1797 AwsServiceError::aws_error(
1798 StatusCode::NOT_FOUND,
1799 "CacheClusterNotFound",
1800 format!("CacheCluster {cluster_id} not found."),
1801 )
1802 })?
1803 }
1804 };
1805
1806 let group = state.replication_groups.get(&group_id).ok_or_else(|| {
1807 AwsServiceError::aws_error(
1808 StatusCode::NOT_FOUND,
1809 "ReplicationGroupNotFoundFault",
1810 format!("ReplicationGroup {group_id} not found."),
1811 )
1812 })?;
1813
1814 let arn = format!(
1815 "arn:aws:elasticache:{}:{}:snapshot:{}",
1816 state.region, state.account_id, snapshot_name
1817 );
1818
1819 let snapshot = CacheSnapshot {
1820 snapshot_name: snapshot_name.clone(),
1821 replication_group_id: group.replication_group_id.clone(),
1822 replication_group_description: group.description.clone(),
1823 snapshot_status: "available".to_string(),
1824 cache_node_type: group.cache_node_type.clone(),
1825 engine: group.engine.clone(),
1826 engine_version: group.engine_version.clone(),
1827 num_cache_clusters: group.num_cache_clusters,
1828 arn: arn.clone(),
1829 created_at: chrono::Utc::now().to_rfc3339(),
1830 snapshot_source: "manual".to_string(),
1831 };
1832
1833 let xml = snapshot_xml(&snapshot);
1834 state.tags.insert(arn, Vec::new());
1835 state.snapshots.insert(snapshot_name, snapshot);
1836
1837 Ok(AwsResponse::xml(
1838 StatusCode::OK,
1839 xml_wrap(
1840 "CreateSnapshot",
1841 &format!("<Snapshot>{xml}</Snapshot>"),
1842 &request.request_id,
1843 ),
1844 ))
1845 }
1846
1847 fn describe_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1848 let snapshot_name = optional_param(request, "SnapshotName");
1849 let replication_group_id = optional_param(request, "ReplicationGroupId");
1850 let cache_cluster_id = optional_param(request, "CacheClusterId");
1851 let max_records = optional_usize_param(request, "MaxRecords")?;
1852 let marker = optional_param(request, "Marker");
1853
1854 let state = self.state.read();
1855
1856 let snapshots: Vec<&CacheSnapshot> = if let Some(ref name) = snapshot_name {
1857 match state.snapshots.get(name) {
1858 Some(s) => vec![s],
1859 None => {
1860 return Err(AwsServiceError::aws_error(
1861 StatusCode::NOT_FOUND,
1862 "SnapshotNotFoundFault",
1863 format!("Snapshot {name} not found."),
1864 ));
1865 }
1866 }
1867 } else {
1868 let mut snaps: Vec<&CacheSnapshot> = state
1869 .snapshots
1870 .values()
1871 .filter(|s| {
1872 replication_group_id
1873 .as_ref()
1874 .is_none_or(|id| s.replication_group_id == *id)
1875 })
1876 .filter(|s| {
1877 cache_cluster_id.as_ref().is_none_or(|cluster_id| {
1878 state.cache_clusters.get(cluster_id).is_some_and(|cluster| {
1879 cluster.replication_group_id.as_deref() == Some(&s.replication_group_id)
1880 }) || state
1881 .replication_groups
1882 .get(&s.replication_group_id)
1883 .is_some_and(|g| g.member_clusters.contains(cluster_id))
1884 })
1885 })
1886 .collect();
1887 snaps.sort_by(|a, b| a.snapshot_name.cmp(&b.snapshot_name));
1888 snaps
1889 };
1890
1891 let (page, next_marker) = paginate(&snapshots, marker.as_deref(), max_records);
1892
1893 let members_xml: String = page
1894 .iter()
1895 .map(|s| format!("<Snapshot>{}</Snapshot>", snapshot_xml(s)))
1896 .collect();
1897 let marker_xml = next_marker
1898 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
1899 .unwrap_or_default();
1900
1901 Ok(AwsResponse::xml(
1902 StatusCode::OK,
1903 xml_wrap(
1904 "DescribeSnapshots",
1905 &format!("<Snapshots>{members_xml}</Snapshots>{marker_xml}"),
1906 &request.request_id,
1907 ),
1908 ))
1909 }
1910
1911 fn delete_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1912 let snapshot_name = required_param(request, "SnapshotName")?;
1913
1914 let mut state = self.state.write();
1915 let mut snapshot = state.snapshots.remove(&snapshot_name).ok_or_else(|| {
1916 AwsServiceError::aws_error(
1917 StatusCode::NOT_FOUND,
1918 "SnapshotNotFoundFault",
1919 format!("Snapshot {snapshot_name} not found."),
1920 )
1921 })?;
1922 state.tags.remove(&snapshot.arn);
1923
1924 snapshot.snapshot_status = "deleting".to_string();
1925 let xml = snapshot_xml(&snapshot);
1926
1927 Ok(AwsResponse::xml(
1928 StatusCode::OK,
1929 xml_wrap(
1930 "DeleteSnapshot",
1931 &format!("<Snapshot>{xml}</Snapshot>"),
1932 &request.request_id,
1933 ),
1934 ))
1935 }
1936
1937 fn modify_replication_group(
1938 &self,
1939 request: &AwsRequest,
1940 ) -> Result<AwsResponse, AwsServiceError> {
1941 let replication_group_id = required_param(request, "ReplicationGroupId")?;
1942
1943 let new_description = optional_param(request, "ReplicationGroupDescription");
1944 let new_cache_node_type = optional_param(request, "CacheNodeType");
1945 let new_engine_version = optional_param(request, "EngineVersion");
1946 let new_automatic_failover =
1947 parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?;
1948 let new_snapshot_retention_limit = optional_param(request, "SnapshotRetentionLimit")
1949 .map(|v| {
1950 let val = v.parse::<i32>().map_err(|_| {
1951 AwsServiceError::aws_error(
1952 StatusCode::BAD_REQUEST,
1953 "InvalidParameterValue",
1954 format!("Invalid value for SnapshotRetentionLimit: '{v}'"),
1955 )
1956 })?;
1957 if val < 0 {
1958 return Err(AwsServiceError::aws_error(
1959 StatusCode::BAD_REQUEST,
1960 "InvalidParameterValue",
1961 format!("SnapshotRetentionLimit must be non-negative, got {val}"),
1962 ));
1963 }
1964 Ok(val)
1965 })
1966 .transpose()?;
1967 let new_snapshot_window = optional_param(request, "SnapshotWindow");
1968 let user_group_ids_to_add =
1969 parse_member_list(&request.query_params, "UserGroupIdsToAdd", "member");
1970 let user_group_ids_to_remove =
1971 parse_member_list(&request.query_params, "UserGroupIdsToRemove", "member");
1972
1973 let mut state = self.state.write();
1974
1975 let group = state
1976 .replication_groups
1977 .get_mut(&replication_group_id)
1978 .ok_or_else(|| {
1979 AwsServiceError::aws_error(
1980 StatusCode::NOT_FOUND,
1981 "ReplicationGroupNotFoundFault",
1982 format!("ReplicationGroup {replication_group_id} not found."),
1983 )
1984 })?;
1985
1986 if let Some(desc) = new_description {
1987 group.description = desc;
1988 }
1989 if let Some(node_type) = new_cache_node_type {
1990 group.cache_node_type = node_type;
1991 }
1992 if let Some(version) = new_engine_version {
1993 group.engine_version = version;
1994 }
1995 if let Some(af) = new_automatic_failover {
1996 group.automatic_failover_enabled = af;
1997 }
1998 if let Some(limit) = new_snapshot_retention_limit {
1999 group.snapshot_retention_limit = limit;
2000 }
2001 if let Some(window) = new_snapshot_window {
2002 group.snapshot_window = window;
2003 }
2004
2005 for ug_id in &user_group_ids_to_add {
2007 if let Some(ug) = state.user_groups.get_mut(ug_id) {
2008 if !ug.replication_groups.contains(&replication_group_id) {
2009 ug.replication_groups.push(replication_group_id.clone());
2010 }
2011 }
2012 }
2013 for ug_id in &user_group_ids_to_remove {
2014 if let Some(ug) = state.user_groups.get_mut(ug_id) {
2015 ug.replication_groups
2016 .retain(|id| id != &replication_group_id);
2017 }
2018 }
2019
2020 let group = state.replication_groups[&replication_group_id].clone();
2021 let region = state.region.clone();
2022 let xml = replication_group_xml(&group, ®ion);
2023
2024 Ok(AwsResponse::xml(
2025 StatusCode::OK,
2026 xml_wrap(
2027 "ModifyReplicationGroup",
2028 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2029 &request.request_id,
2030 ),
2031 ))
2032 }
2033
2034 fn modify_global_replication_group(
2035 &self,
2036 request: &AwsRequest,
2037 ) -> Result<AwsResponse, AwsServiceError> {
2038 let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2039 let _apply_immediately = parse_required_bool(request, "ApplyImmediately")?;
2040 let new_description = optional_param(request, "GlobalReplicationGroupDescription");
2041 let new_cache_node_type = optional_param(request, "CacheNodeType");
2042 let new_engine = optional_param(request, "Engine");
2043 let new_engine_version = optional_param(request, "EngineVersion");
2044 let new_automatic_failover =
2045 parse_optional_bool(optional_param(request, "AutomaticFailoverEnabled").as_deref())?;
2046
2047 let mut state = self.state.write();
2048 let primary_replication_group_id = state
2049 .global_replication_groups
2050 .get(&global_replication_group_id)
2051 .and_then(primary_global_member)
2052 .map(|member| member.replication_group_id.clone())
2053 .ok_or_else(|| {
2054 AwsServiceError::aws_error(
2055 StatusCode::NOT_FOUND,
2056 "GlobalReplicationGroupNotFoundFault",
2057 format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2058 )
2059 })?;
2060
2061 if let Some(ref engine) = new_engine {
2062 validate_serverless_engine(engine)?;
2063 let current_engine =
2064 &state.global_replication_groups[&global_replication_group_id].engine;
2065 if engine != current_engine {
2066 return Err(AwsServiceError::aws_error(
2067 StatusCode::BAD_REQUEST,
2068 "InvalidParameterValue",
2069 format!(
2070 "Engine changes are not supported for GlobalReplicationGroup {global_replication_group_id}."
2071 ),
2072 ));
2073 }
2074 }
2075
2076 if let Some(primary_group) = state
2077 .replication_groups
2078 .get_mut(&primary_replication_group_id)
2079 {
2080 if let Some(cache_node_type) = new_cache_node_type.clone() {
2081 primary_group.cache_node_type = cache_node_type;
2082 }
2083 if let Some(engine_version) = new_engine_version.clone() {
2084 primary_group.engine_version = engine_version;
2085 }
2086 if let Some(automatic_failover) = new_automatic_failover {
2087 primary_group.automatic_failover_enabled = automatic_failover;
2088 }
2089 }
2090
2091 let primary_group = state.replication_groups[&primary_replication_group_id].clone();
2092 let group = state
2093 .global_replication_groups
2094 .get_mut(&global_replication_group_id)
2095 .expect("global replication group exists");
2096 if let Some(description) = new_description {
2097 group.global_replication_group_description = description;
2098 }
2099 group.cache_node_type = primary_group.cache_node_type.clone();
2100 group.engine = primary_group.engine.clone();
2101 group.engine_version = primary_group.engine_version.clone();
2102 if let Some(member) = group
2103 .members
2104 .iter_mut()
2105 .find(|member| member.role == "primary")
2106 {
2107 member.automatic_failover = primary_group.automatic_failover_enabled;
2108 }
2109
2110 let xml = global_replication_group_xml(group, true);
2111 Ok(AwsResponse::xml(
2112 StatusCode::OK,
2113 xml_wrap(
2114 "ModifyGlobalReplicationGroup",
2115 &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2116 &request.request_id,
2117 ),
2118 ))
2119 }
2120
2121 fn increase_replica_count(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2122 let replication_group_id = required_param(request, "ReplicationGroupId")?;
2123 let apply_str = required_param(request, "ApplyImmediately")?;
2124 let _apply_immediately = parse_optional_bool(Some(&apply_str))?.ok_or_else(|| {
2125 AwsServiceError::aws_error(
2126 StatusCode::BAD_REQUEST,
2127 "InvalidParameterValue",
2128 format!(
2129 "Invalid boolean value for ApplyImmediately: '{}'",
2130 apply_str
2131 ),
2132 )
2133 })?;
2134
2135 let new_replica_count = optional_param(request, "NewReplicaCount")
2136 .map(|v| {
2137 v.parse::<i32>().map_err(|_| {
2138 AwsServiceError::aws_error(
2139 StatusCode::BAD_REQUEST,
2140 "InvalidParameterValue",
2141 format!("Invalid value for NewReplicaCount: '{v}'"),
2142 )
2143 })
2144 })
2145 .transpose()?
2146 .ok_or_else(|| {
2147 AwsServiceError::aws_error(
2148 StatusCode::BAD_REQUEST,
2149 "MissingParameter",
2150 "The request must contain the parameter NewReplicaCount.".to_string(),
2151 )
2152 })?;
2153
2154 if new_replica_count < 1 {
2155 return Err(AwsServiceError::aws_error(
2156 StatusCode::BAD_REQUEST,
2157 "InvalidParameterValue",
2158 format!("NewReplicaCount must be a positive integer, got {new_replica_count}"),
2159 ));
2160 }
2161
2162 let mut state = self.state.write();
2163
2164 let group = state
2165 .replication_groups
2166 .get_mut(&replication_group_id)
2167 .ok_or_else(|| {
2168 AwsServiceError::aws_error(
2169 StatusCode::NOT_FOUND,
2170 "ReplicationGroupNotFoundFault",
2171 format!("ReplicationGroup {replication_group_id} not found."),
2172 )
2173 })?;
2174
2175 let new_total = new_replica_count.checked_add(1).ok_or_else(|| {
2177 AwsServiceError::aws_error(
2178 StatusCode::BAD_REQUEST,
2179 "InvalidParameterValue",
2180 format!("NewReplicaCount value {new_replica_count} is too large"),
2181 )
2182 })?;
2183 if new_total <= group.num_cache_clusters {
2184 return Err(AwsServiceError::aws_error(
2185 StatusCode::BAD_REQUEST,
2186 "InvalidParameterValue",
2187 format!(
2188 "NewReplicaCount ({new_replica_count}) must result in more clusters than current count ({}).",
2189 group.num_cache_clusters
2190 ),
2191 ));
2192 }
2193
2194 group.num_cache_clusters = new_total;
2195 group.member_clusters = (1..=new_total)
2196 .map(|i| format!("{replication_group_id}-{i:03}"))
2197 .collect();
2198
2199 let group = group.clone();
2200 let region = state.region.clone();
2201 let xml = replication_group_xml(&group, ®ion);
2202
2203 Ok(AwsResponse::xml(
2204 StatusCode::OK,
2205 xml_wrap(
2206 "IncreaseReplicaCount",
2207 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2208 &request.request_id,
2209 ),
2210 ))
2211 }
2212
2213 fn decrease_replica_count(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2214 let replication_group_id = required_param(request, "ReplicationGroupId")?;
2215 let apply_str = required_param(request, "ApplyImmediately")?;
2216 let _apply_immediately = parse_optional_bool(Some(&apply_str))?.ok_or_else(|| {
2217 AwsServiceError::aws_error(
2218 StatusCode::BAD_REQUEST,
2219 "InvalidParameterValue",
2220 format!(
2221 "Invalid boolean value for ApplyImmediately: '{}'",
2222 apply_str
2223 ),
2224 )
2225 })?;
2226
2227 let new_replica_count = optional_param(request, "NewReplicaCount")
2228 .map(|v| {
2229 v.parse::<i32>().map_err(|_| {
2230 AwsServiceError::aws_error(
2231 StatusCode::BAD_REQUEST,
2232 "InvalidParameterValue",
2233 format!("Invalid value for NewReplicaCount: '{v}'"),
2234 )
2235 })
2236 })
2237 .transpose()?
2238 .ok_or_else(|| {
2239 AwsServiceError::aws_error(
2240 StatusCode::BAD_REQUEST,
2241 "MissingParameter",
2242 "The request must contain the parameter NewReplicaCount.".to_string(),
2243 )
2244 })?;
2245
2246 if new_replica_count < 0 {
2247 return Err(AwsServiceError::aws_error(
2248 StatusCode::BAD_REQUEST,
2249 "InvalidParameterValue",
2250 format!("NewReplicaCount must be non-negative, got {new_replica_count}"),
2251 ));
2252 }
2253
2254 let mut state = self.state.write();
2255
2256 let group = state
2257 .replication_groups
2258 .get_mut(&replication_group_id)
2259 .ok_or_else(|| {
2260 AwsServiceError::aws_error(
2261 StatusCode::NOT_FOUND,
2262 "ReplicationGroupNotFoundFault",
2263 format!("ReplicationGroup {replication_group_id} not found."),
2264 )
2265 })?;
2266
2267 let new_total = new_replica_count.checked_add(1).ok_or_else(|| {
2269 AwsServiceError::aws_error(
2270 StatusCode::BAD_REQUEST,
2271 "InvalidParameterValue",
2272 format!("NewReplicaCount value {new_replica_count} is too large"),
2273 )
2274 })?;
2275 if new_total >= group.num_cache_clusters {
2276 return Err(AwsServiceError::aws_error(
2277 StatusCode::BAD_REQUEST,
2278 "InvalidParameterValue",
2279 format!(
2280 "NewReplicaCount ({new_replica_count}) must result in fewer clusters than current count ({}).",
2281 group.num_cache_clusters
2282 ),
2283 ));
2284 }
2285
2286 group.num_cache_clusters = new_total;
2287 group.member_clusters = (1..=new_total)
2288 .map(|i| format!("{replication_group_id}-{i:03}"))
2289 .collect();
2290
2291 let group = group.clone();
2292 let region = state.region.clone();
2293 let xml = replication_group_xml(&group, ®ion);
2294
2295 Ok(AwsResponse::xml(
2296 StatusCode::OK,
2297 xml_wrap(
2298 "DecreaseReplicaCount",
2299 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2300 &request.request_id,
2301 ),
2302 ))
2303 }
2304
2305 fn test_failover(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2306 let replication_group_id = required_param(request, "ReplicationGroupId")?;
2307 let node_group_id = required_param(request, "NodeGroupId")?;
2308
2309 let state = self.state.read();
2310
2311 let group = state
2312 .replication_groups
2313 .get(&replication_group_id)
2314 .ok_or_else(|| {
2315 AwsServiceError::aws_error(
2316 StatusCode::NOT_FOUND,
2317 "ReplicationGroupNotFoundFault",
2318 format!("ReplicationGroup {replication_group_id} not found."),
2319 )
2320 })?;
2321
2322 if node_group_id != "0001" {
2324 return Err(AwsServiceError::aws_error(
2325 StatusCode::NOT_FOUND,
2326 "NodeGroupNotFoundFault",
2327 format!("NodeGroup {node_group_id} not found in ReplicationGroup {replication_group_id}."),
2328 ));
2329 }
2330
2331 let region = state.region.clone();
2332 let xml = replication_group_xml(group, ®ion);
2333
2334 Ok(AwsResponse::xml(
2335 StatusCode::OK,
2336 xml_wrap(
2337 "TestFailover",
2338 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
2339 &request.request_id,
2340 ),
2341 ))
2342 }
2343
2344 fn disassociate_global_replication_group(
2345 &self,
2346 request: &AwsRequest,
2347 ) -> Result<AwsResponse, AwsServiceError> {
2348 let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2349 let replication_group_id = required_param(request, "ReplicationGroupId")?;
2350 let replication_group_region = required_param(request, "ReplicationGroupRegion")?;
2351
2352 let state = self.state.read();
2353 let group = state
2354 .global_replication_groups
2355 .get(&global_replication_group_id)
2356 .ok_or_else(|| {
2357 AwsServiceError::aws_error(
2358 StatusCode::NOT_FOUND,
2359 "GlobalReplicationGroupNotFoundFault",
2360 format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2361 )
2362 })?;
2363
2364 let primary_member = primary_global_member(group).ok_or_else(|| {
2365 AwsServiceError::aws_error(
2366 StatusCode::BAD_REQUEST,
2367 "InvalidGlobalReplicationGroupState",
2368 format!(
2369 "GlobalReplicationGroup {global_replication_group_id} does not have a primary member."
2370 ),
2371 )
2372 })?;
2373 if primary_member.replication_group_id != replication_group_id
2374 || primary_member.replication_group_region != replication_group_region
2375 {
2376 return Err(AwsServiceError::aws_error(
2377 StatusCode::BAD_REQUEST,
2378 "InvalidParameterValue",
2379 format!(
2380 "ReplicationGroup {replication_group_id} in region {replication_group_region} is not associated with GlobalReplicationGroup {global_replication_group_id}."
2381 ),
2382 ));
2383 }
2384
2385 let xml = global_replication_group_xml(group, true);
2386 Ok(AwsResponse::xml(
2387 StatusCode::OK,
2388 xml_wrap(
2389 "DisassociateGlobalReplicationGroup",
2390 &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2391 &request.request_id,
2392 ),
2393 ))
2394 }
2395
2396 fn failover_global_replication_group(
2397 &self,
2398 request: &AwsRequest,
2399 ) -> Result<AwsResponse, AwsServiceError> {
2400 let global_replication_group_id = required_param(request, "GlobalReplicationGroupId")?;
2401 let primary_region = required_param(request, "PrimaryRegion")?;
2402 let primary_replication_group_id = required_param(request, "PrimaryReplicationGroupId")?;
2403
2404 let state = self.state.read();
2405 let group = state
2406 .global_replication_groups
2407 .get(&global_replication_group_id)
2408 .ok_or_else(|| {
2409 AwsServiceError::aws_error(
2410 StatusCode::NOT_FOUND,
2411 "GlobalReplicationGroupNotFoundFault",
2412 format!("GlobalReplicationGroup {global_replication_group_id} not found."),
2413 )
2414 })?;
2415
2416 let primary_member = primary_global_member(group).ok_or_else(|| {
2417 AwsServiceError::aws_error(
2418 StatusCode::BAD_REQUEST,
2419 "InvalidGlobalReplicationGroupState",
2420 format!(
2421 "GlobalReplicationGroup {global_replication_group_id} does not have a primary member."
2422 ),
2423 )
2424 })?;
2425 if primary_member.replication_group_id != primary_replication_group_id
2426 || primary_member.replication_group_region != primary_region
2427 {
2428 return Err(AwsServiceError::aws_error(
2429 StatusCode::BAD_REQUEST,
2430 "InvalidParameterValue",
2431 format!(
2432 "PrimaryReplicationGroupId and PrimaryRegion do not match the current primary for GlobalReplicationGroup {global_replication_group_id}."
2433 ),
2434 ));
2435 }
2436
2437 let xml = global_replication_group_xml(group, true);
2438 Ok(AwsResponse::xml(
2439 StatusCode::OK,
2440 xml_wrap(
2441 "FailoverGlobalReplicationGroup",
2442 &format!("<GlobalReplicationGroup>{xml}</GlobalReplicationGroup>"),
2443 &request.request_id,
2444 ),
2445 ))
2446 }
2447
2448 fn create_user(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2449 let user_id = required_param(request, "UserId")?;
2450 let user_name = required_param(request, "UserName")?;
2451 let engine = required_param(request, "Engine")?;
2452 let access_string = required_param(request, "AccessString")?;
2453
2454 validate_engine(&engine)?;
2455
2456 let no_password_required =
2457 parse_optional_bool(optional_param(request, "NoPasswordRequired").as_deref())?
2458 .unwrap_or(false);
2459 let passwords = parse_member_list(&request.query_params, "Passwords", "member");
2460 let auth_mode_type = optional_param(request, "AuthenticationMode.Type");
2461
2462 let (authentication_type, password_count) = if no_password_required {
2463 if !passwords.is_empty() {
2464 return Err(AwsServiceError::aws_error(
2465 StatusCode::BAD_REQUEST,
2466 "InvalidParameterCombination",
2467 "Passwords cannot be provided when NoPasswordRequired is true.".to_string(),
2468 ));
2469 }
2470 ("no-password".to_string(), 0)
2471 } else if let Some(ref mode) = auth_mode_type {
2472 let mode_passwords = parse_member_list(
2473 &request.query_params,
2474 "AuthenticationMode.Passwords",
2475 "member",
2476 );
2477 match mode.as_str() {
2478 "password" => {
2479 if mode_passwords.is_empty() {
2480 return Err(AwsServiceError::aws_error(
2481 StatusCode::BAD_REQUEST,
2482 "InvalidParameterValue",
2483 "At least one password is required when AuthenticationMode.Type is password.".to_string(),
2484 ));
2485 }
2486 ("password".to_string(), mode_passwords.len() as i32)
2487 }
2488 "no-password-required" | "iam" => {
2489 if !mode_passwords.is_empty() {
2490 return Err(AwsServiceError::aws_error(
2491 StatusCode::BAD_REQUEST,
2492 "InvalidParameterValue",
2493 format!("Passwords cannot be provided when AuthenticationMode.Type is {mode}."),
2494 ));
2495 }
2496 (mode.clone(), 0)
2497 }
2498 _ => {
2499 return Err(AwsServiceError::aws_error(
2500 StatusCode::BAD_REQUEST,
2501 "InvalidParameterValue",
2502 format!("Invalid value for AuthenticationMode.Type: {mode}. Supported values: password, iam, no-password-required"),
2503 ));
2504 }
2505 }
2506 } else if !passwords.is_empty() {
2507 ("password".to_string(), passwords.len() as i32)
2508 } else {
2509 ("no-password".to_string(), 0)
2510 };
2511
2512 let minimum_engine_version = if engine == ENGINE_VALKEY {
2513 "8.0".to_string()
2514 } else {
2515 "6.0".to_string()
2516 };
2517
2518 let mut state = self.state.write();
2519
2520 if state.users.contains_key(&user_id) {
2521 return Err(AwsServiceError::aws_error(
2522 StatusCode::BAD_REQUEST,
2523 "UserAlreadyExistsFault",
2524 format!("User {user_id} already exists."),
2525 ));
2526 }
2527
2528 let arn = format!(
2529 "arn:aws:elasticache:{}:{}:user:{}",
2530 state.region, state.account_id, user_id
2531 );
2532
2533 let user = ElastiCacheUser {
2534 user_id: user_id.clone(),
2535 user_name,
2536 engine,
2537 access_string,
2538 status: "active".to_string(),
2539 authentication_type,
2540 password_count,
2541 arn,
2542 minimum_engine_version,
2543 user_group_ids: Vec::new(),
2544 };
2545
2546 let xml = user_xml(&user);
2547 state.register_arn(&user.arn);
2548 state.users.insert(user_id, user);
2549
2550 Ok(AwsResponse::xml(
2551 StatusCode::OK,
2552 xml_wrap("CreateUser", &xml, &request.request_id),
2553 ))
2554 }
2555
2556 fn describe_users(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2557 let user_id = optional_param(request, "UserId");
2558 let max_records = optional_usize_param(request, "MaxRecords")?;
2559 let marker = optional_param(request, "Marker");
2560
2561 let state = self.state.read();
2562
2563 let users: Vec<&ElastiCacheUser> = if let Some(ref id) = user_id {
2564 match state.users.get(id) {
2565 Some(u) => vec![u],
2566 None => {
2567 return Err(AwsServiceError::aws_error(
2568 StatusCode::NOT_FOUND,
2569 "UserNotFoundFault",
2570 format!("User {id} not found."),
2571 ));
2572 }
2573 }
2574 } else {
2575 let mut users: Vec<&ElastiCacheUser> = state.users.values().collect();
2576 users.sort_by(|a, b| a.user_id.cmp(&b.user_id));
2577 users
2578 };
2579
2580 let (page, next_marker) = paginate(&users, marker.as_deref(), max_records);
2581
2582 let members_xml: String = page
2583 .iter()
2584 .map(|u| format!("<member>{}</member>", user_xml(u)))
2585 .collect();
2586 let marker_xml = next_marker
2587 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
2588 .unwrap_or_default();
2589
2590 Ok(AwsResponse::xml(
2591 StatusCode::OK,
2592 xml_wrap(
2593 "DescribeUsers",
2594 &format!("<Users>{members_xml}</Users>{marker_xml}"),
2595 &request.request_id,
2596 ),
2597 ))
2598 }
2599
2600 fn delete_user(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2601 let user_id = required_param(request, "UserId")?;
2602
2603 if user_id == "default" {
2604 return Err(AwsServiceError::aws_error(
2605 StatusCode::BAD_REQUEST,
2606 "InvalidParameterValue",
2607 "Cannot delete the default user.".to_string(),
2608 ));
2609 }
2610
2611 let mut state = self.state.write();
2612
2613 let user = state.users.remove(&user_id).ok_or_else(|| {
2614 AwsServiceError::aws_error(
2615 StatusCode::NOT_FOUND,
2616 "UserNotFoundFault",
2617 format!("User {user_id} not found."),
2618 )
2619 })?;
2620
2621 state.tags.remove(&user.arn);
2622
2623 for group in state.user_groups.values_mut() {
2625 group.user_ids.retain(|id| id != &user_id);
2626 }
2627
2628 let mut deleted_user = user;
2629 deleted_user.status = "deleting".to_string();
2630 let xml = user_xml(&deleted_user);
2631
2632 Ok(AwsResponse::xml(
2633 StatusCode::OK,
2634 xml_wrap("DeleteUser", &xml, &request.request_id),
2635 ))
2636 }
2637
2638 fn create_user_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2639 let user_group_id = required_param(request, "UserGroupId")?;
2640 let engine = required_param(request, "Engine")?;
2641
2642 validate_engine(&engine)?;
2643
2644 let user_ids = parse_member_list(&request.query_params, "UserIds", "member");
2645
2646 let minimum_engine_version = if engine == ENGINE_VALKEY {
2647 "8.0".to_string()
2648 } else {
2649 "6.0".to_string()
2650 };
2651
2652 let mut state = self.state.write();
2653
2654 if state.user_groups.contains_key(&user_group_id) {
2655 return Err(AwsServiceError::aws_error(
2656 StatusCode::BAD_REQUEST,
2657 "UserGroupAlreadyExistsFault",
2658 format!("User Group {user_group_id} already exists."),
2659 ));
2660 }
2661
2662 for uid in &user_ids {
2664 match state.users.get(uid) {
2665 None => {
2666 return Err(AwsServiceError::aws_error(
2667 StatusCode::NOT_FOUND,
2668 "UserNotFoundFault",
2669 format!("User {uid} not found."),
2670 ));
2671 }
2672 Some(user) if user.engine != engine => {
2673 return Err(AwsServiceError::aws_error(
2674 StatusCode::BAD_REQUEST,
2675 "InvalidParameterValue",
2676 format!(
2677 "User {uid} has engine {} which does not match the user group engine {engine}.",
2678 user.engine
2679 ),
2680 ));
2681 }
2682 _ => {}
2683 }
2684 }
2685
2686 let arn = format!(
2687 "arn:aws:elasticache:{}:{}:usergroup:{}",
2688 state.region, state.account_id, user_group_id
2689 );
2690
2691 let group = ElastiCacheUserGroup {
2692 user_group_id: user_group_id.clone(),
2693 engine,
2694 status: "active".to_string(),
2695 user_ids: user_ids.clone(),
2696 arn,
2697 minimum_engine_version,
2698 pending_changes: None,
2699 replication_groups: Vec::new(),
2700 };
2701
2702 for uid in &user_ids {
2704 if let Some(user) = state.users.get_mut(uid) {
2705 user.user_group_ids.push(user_group_id.clone());
2706 }
2707 }
2708
2709 let xml = user_group_xml(&group);
2710 state.register_arn(&group.arn);
2711 state.user_groups.insert(user_group_id, group);
2712
2713 Ok(AwsResponse::xml(
2714 StatusCode::OK,
2715 xml_wrap("CreateUserGroup", &xml, &request.request_id),
2716 ))
2717 }
2718
2719 fn describe_user_groups(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2720 let user_group_id = optional_param(request, "UserGroupId");
2721 let max_records = optional_usize_param(request, "MaxRecords")?;
2722 let marker = optional_param(request, "Marker");
2723
2724 let state = self.state.read();
2725
2726 let groups: Vec<&ElastiCacheUserGroup> = if let Some(ref id) = user_group_id {
2727 match state.user_groups.get(id) {
2728 Some(g) => vec![g],
2729 None => {
2730 return Err(AwsServiceError::aws_error(
2731 StatusCode::NOT_FOUND,
2732 "UserGroupNotFoundFault",
2733 format!("User Group {id} not found."),
2734 ));
2735 }
2736 }
2737 } else {
2738 let mut groups: Vec<&ElastiCacheUserGroup> = state.user_groups.values().collect();
2739 groups.sort_by(|a, b| a.user_group_id.cmp(&b.user_group_id));
2740 groups
2741 };
2742
2743 let (page, next_marker) = paginate(&groups, marker.as_deref(), max_records);
2744
2745 let members_xml: String = page
2746 .iter()
2747 .map(|g| format!("<member>{}</member>", user_group_xml(g)))
2748 .collect();
2749 let marker_xml = next_marker
2750 .map(|m| format!("<Marker>{}</Marker>", xml_escape(&m)))
2751 .unwrap_or_default();
2752
2753 Ok(AwsResponse::xml(
2754 StatusCode::OK,
2755 xml_wrap(
2756 "DescribeUserGroups",
2757 &format!("<UserGroups>{members_xml}</UserGroups>{marker_xml}"),
2758 &request.request_id,
2759 ),
2760 ))
2761 }
2762
2763 fn delete_user_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2764 let user_group_id = required_param(request, "UserGroupId")?;
2765
2766 let mut state = self.state.write();
2767
2768 let group = state.user_groups.remove(&user_group_id).ok_or_else(|| {
2769 AwsServiceError::aws_error(
2770 StatusCode::NOT_FOUND,
2771 "UserGroupNotFoundFault",
2772 format!("User Group {user_group_id} not found."),
2773 )
2774 })?;
2775
2776 state.tags.remove(&group.arn);
2777
2778 for uid in &group.user_ids {
2780 if let Some(user) = state.users.get_mut(uid) {
2781 user.user_group_ids.retain(|gid| gid != &user_group_id);
2782 }
2783 }
2784
2785 let mut deleted_group = group;
2786 deleted_group.status = "deleting".to_string();
2787 let xml = user_group_xml(&deleted_group);
2788
2789 Ok(AwsResponse::xml(
2790 StatusCode::OK,
2791 xml_wrap("DeleteUserGroup", &xml, &request.request_id),
2792 ))
2793 }
2794
2795 fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2796 let resource_name = required_param(request, "ResourceName")?;
2797 let tags = parse_tags(request)?;
2798
2799 let mut state = self.state.write();
2800 let tag_list = state.tags.get_mut(&resource_name).ok_or_else(|| {
2801 AwsServiceError::aws_error(
2802 StatusCode::NOT_FOUND,
2803 "CacheClusterNotFound",
2804 format!("The resource {resource_name} could not be found."),
2805 )
2806 })?;
2807
2808 merge_tags(tag_list, &tags);
2809
2810 let tag_xml: String = tag_list.iter().map(tag_xml).collect();
2811
2812 Ok(AwsResponse::xml(
2813 StatusCode::OK,
2814 xml_wrap(
2815 "AddTagsToResource",
2816 &format!("<TagList>{tag_xml}</TagList>"),
2817 &request.request_id,
2818 ),
2819 ))
2820 }
2821
2822 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2823 let resource_name = required_param(request, "ResourceName")?;
2824
2825 let state = self.state.read();
2826 let tag_list = state.tags.get(&resource_name).ok_or_else(|| {
2827 AwsServiceError::aws_error(
2828 StatusCode::NOT_FOUND,
2829 "CacheClusterNotFound",
2830 format!("The resource {resource_name} could not be found."),
2831 )
2832 })?;
2833
2834 let tag_xml: String = tag_list.iter().map(tag_xml).collect();
2835
2836 Ok(AwsResponse::xml(
2837 StatusCode::OK,
2838 xml_wrap(
2839 "ListTagsForResource",
2840 &format!("<TagList>{tag_xml}</TagList>"),
2841 &request.request_id,
2842 ),
2843 ))
2844 }
2845
2846 fn remove_tags_from_resource(
2847 &self,
2848 request: &AwsRequest,
2849 ) -> Result<AwsResponse, AwsServiceError> {
2850 let resource_name = required_param(request, "ResourceName")?;
2851 let tag_keys = parse_tag_keys(request)?;
2852
2853 let mut state = self.state.write();
2854 let tag_list = state.tags.get_mut(&resource_name).ok_or_else(|| {
2855 AwsServiceError::aws_error(
2856 StatusCode::NOT_FOUND,
2857 "CacheClusterNotFound",
2858 format!("The resource {resource_name} could not be found."),
2859 )
2860 })?;
2861
2862 tag_list.retain(|(key, _)| !tag_keys.contains(key));
2863
2864 Ok(AwsResponse::xml(
2865 StatusCode::OK,
2866 xml_wrap("RemoveTagsFromResource", "", &request.request_id),
2867 ))
2868 }
2869}
2870
2871fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2874 fakecloud_core::query::optional_query_param(req, name)
2875}
2876
2877fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2878 fakecloud_core::query::required_query_param(req, name)
2879}
2880
2881fn parse_required_bool(req: &AwsRequest, name: &str) -> Result<bool, AwsServiceError> {
2882 parse_optional_bool(Some(&required_param(req, name)?))?.ok_or_else(|| {
2883 AwsServiceError::aws_error(
2884 StatusCode::BAD_REQUEST,
2885 "InvalidParameterValue",
2886 format!("Boolean parameter {name} is invalid."),
2887 )
2888 })
2889}
2890
2891fn validate_serverless_engine(engine: &str) -> Result<(), AwsServiceError> {
2892 validate_engine(engine)
2893}
2894
2895fn default_major_engine_version(engine: &str) -> &'static str {
2896 if engine == ENGINE_VALKEY {
2897 "8.0"
2898 } else {
2899 "7.1"
2900 }
2901}
2902
2903fn default_full_engine_version(
2904 engine: &str,
2905 major_engine_version: &str,
2906) -> Result<String, AwsServiceError> {
2907 if major_engine_version.is_empty() {
2908 return Err(AwsServiceError::aws_error(
2909 StatusCode::BAD_REQUEST,
2910 "InvalidParameterValue",
2911 "MajorEngineVersion must not be empty.".to_string(),
2912 ));
2913 }
2914
2915 if (engine == ENGINE_REDIS && !major_engine_version.starts_with('7'))
2916 || (engine == ENGINE_VALKEY && !major_engine_version.starts_with('8'))
2917 {
2918 return Err(AwsServiceError::aws_error(
2919 StatusCode::BAD_REQUEST,
2920 "InvalidParameterValue",
2921 format!(
2922 "MajorEngineVersion {major_engine_version} is not supported for engine {engine}."
2923 ),
2924 ));
2925 }
2926
2927 Ok(major_engine_version.to_string())
2928}
2929
2930fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2931 value
2932 .map(|raw| match raw {
2933 "true" | "True" | "TRUE" => Ok(true),
2934 "false" | "False" | "FALSE" => Ok(false),
2935 _ => Err(AwsServiceError::aws_error(
2936 StatusCode::BAD_REQUEST,
2937 "InvalidParameterValue",
2938 format!("Boolean parameter value '{raw}' is invalid."),
2939 )),
2940 })
2941 .transpose()
2942}
2943
2944fn optional_non_negative_i32_param(
2945 req: &AwsRequest,
2946 name: &str,
2947) -> Result<Option<i32>, AwsServiceError> {
2948 optional_param(req, name)
2949 .map(|v| {
2950 let parsed = v.parse::<i32>().map_err(|_| {
2951 AwsServiceError::aws_error(
2952 StatusCode::BAD_REQUEST,
2953 "InvalidParameterValue",
2954 format!("Invalid value for {name}: '{v}'"),
2955 )
2956 })?;
2957 if parsed < 0 {
2958 return Err(AwsServiceError::aws_error(
2959 StatusCode::BAD_REQUEST,
2960 "InvalidParameterValue",
2961 format!("{name} must be non-negative, got {parsed}"),
2962 ));
2963 }
2964 Ok(parsed)
2965 })
2966 .transpose()
2967}
2968
2969fn parse_cache_usage_limits(
2970 req: &AwsRequest,
2971) -> Result<Option<ServerlessCacheUsageLimits>, AwsServiceError> {
2972 let data_storage_maximum =
2973 optional_non_negative_i32_param(req, "CacheUsageLimits.DataStorage.Maximum")?;
2974 let data_storage_minimum =
2975 optional_non_negative_i32_param(req, "CacheUsageLimits.DataStorage.Minimum")?;
2976 let data_storage_unit = optional_param(req, "CacheUsageLimits.DataStorage.Unit");
2977 let ecpu_maximum =
2978 optional_non_negative_i32_param(req, "CacheUsageLimits.ECPUPerSecond.Maximum")?;
2979 let ecpu_minimum =
2980 optional_non_negative_i32_param(req, "CacheUsageLimits.ECPUPerSecond.Minimum")?;
2981
2982 if let (Some(minimum), Some(maximum)) = (data_storage_minimum, data_storage_maximum) {
2983 if minimum > maximum {
2984 return Err(AwsServiceError::aws_error(
2985 StatusCode::BAD_REQUEST,
2986 "InvalidParameterValue",
2987 format!(
2988 "CacheUsageLimits.DataStorage.Minimum ({minimum}) must be less than or equal to Maximum ({maximum})."
2989 ),
2990 ));
2991 }
2992 }
2993 if let (Some(minimum), Some(maximum)) = (ecpu_minimum, ecpu_maximum) {
2994 if minimum > maximum {
2995 return Err(AwsServiceError::aws_error(
2996 StatusCode::BAD_REQUEST,
2997 "InvalidParameterValue",
2998 format!(
2999 "CacheUsageLimits.ECPUPerSecond.Minimum ({minimum}) must be less than or equal to Maximum ({maximum})."
3000 ),
3001 ));
3002 }
3003 }
3004
3005 let data_storage = if data_storage_maximum.is_some()
3006 || data_storage_minimum.is_some()
3007 || data_storage_unit.is_some()
3008 {
3009 Some(ServerlessCacheDataStorage {
3010 maximum: data_storage_maximum,
3011 minimum: data_storage_minimum,
3012 unit: data_storage_unit,
3013 })
3014 } else {
3015 None
3016 };
3017 let ecpu_per_second = if ecpu_maximum.is_some() || ecpu_minimum.is_some() {
3018 Some(ServerlessCacheEcpuPerSecond {
3019 maximum: ecpu_maximum,
3020 minimum: ecpu_minimum,
3021 })
3022 } else {
3023 None
3024 };
3025
3026 if data_storage.is_none() && ecpu_per_second.is_none() {
3027 Ok(None)
3028 } else {
3029 Ok(Some(ServerlessCacheUsageLimits {
3030 data_storage,
3031 ecpu_per_second,
3032 }))
3033 }
3034}
3035
3036fn parse_member_list(
3043 params: &std::collections::HashMap<String, String>,
3044 param: &str,
3045 member_name: &str,
3046) -> Vec<String> {
3047 let prefix = format!("{param}.{member_name}.");
3048 let mut indexed: Vec<(usize, String)> = params
3049 .iter()
3050 .filter_map(|(k, v)| {
3051 k.strip_prefix(&prefix)
3052 .and_then(|idx| idx.parse::<usize>().ok())
3053 .map(|idx| (idx, v.clone()))
3054 })
3055 .collect();
3056 indexed.sort_by_key(|(idx, _)| *idx);
3057 indexed.into_iter().map(|(_, v)| v).collect()
3058}
3059
3060fn parse_query_list_param(req: &AwsRequest, param: &str, member_name: &str) -> Vec<String> {
3061 let mut indexed = parse_member_list(&req.query_params, param, member_name);
3062 if indexed.is_empty() {
3063 indexed = parse_member_list(&req.query_params, param, "member");
3064 }
3065 if indexed.is_empty() {
3066 indexed = req.query_params.get(param).into_iter().cloned().collect();
3067 }
3068 indexed
3069}
3070
3071fn optional_usize_param(req: &AwsRequest, name: &str) -> Result<Option<usize>, AwsServiceError> {
3072 optional_param(req, name)
3073 .map(|v| {
3074 v.parse::<usize>().map_err(|_| {
3075 AwsServiceError::aws_error(
3076 StatusCode::BAD_REQUEST,
3077 "InvalidParameterValue",
3078 format!("Value '{v}' for parameter {name} is not a valid integer."),
3079 )
3080 })
3081 })
3082 .transpose()
3083}
3084
3085fn parse_reserved_duration_filter(value: Option<String>) -> Result<Option<i32>, AwsServiceError> {
3086 value
3087 .map(|raw| match raw.as_str() {
3088 "1" => Ok(31_536_000),
3089 "3" => Ok(94_608_000),
3090 "31536000" => Ok(31_536_000),
3091 "94608000" => Ok(94_608_000),
3092 _ => Err(AwsServiceError::aws_error(
3093 StatusCode::BAD_REQUEST,
3094 "InvalidParameterValue",
3095 format!(
3096 "Invalid value for Duration: {raw}. Valid values are 1, 3, 31536000, or 94608000."
3097 ),
3098 )),
3099 })
3100 .transpose()
3101}
3102
3103fn paginate<T: Clone>(
3105 items: &[T],
3106 marker: Option<&str>,
3107 max_records: Option<usize>,
3108) -> (Vec<T>, Option<String>) {
3109 let start = marker.and_then(|m| m.parse::<usize>().ok()).unwrap_or(0);
3110 let limit = max_records.unwrap_or(100).min(100);
3111
3112 if start >= items.len() {
3113 return (Vec::new(), None);
3114 }
3115
3116 let end = start.saturating_add(limit).min(items.len());
3117 let page = items[start..end].to_vec();
3118 let next_marker = if end < items.len() {
3119 Some(end.to_string())
3120 } else {
3121 None
3122 };
3123 (page, next_marker)
3124}
3125
3126fn parse_tags(req: &AwsRequest) -> Result<Vec<(String, String)>, AwsServiceError> {
3129 let mut tags = Vec::new();
3130 for index in 1.. {
3131 let key_name = format!("Tags.Tag.{index}.Key");
3132 let value_name = format!("Tags.Tag.{index}.Value");
3133 let key = optional_param(req, &key_name);
3134 let value = optional_param(req, &value_name);
3135 match (key, value) {
3136 (Some(k), Some(v)) => tags.push((k, v)),
3137 (None, None) => break,
3138 _ => {
3139 return Err(AwsServiceError::aws_error(
3140 StatusCode::BAD_REQUEST,
3141 "InvalidParameterValue",
3142 "Each tag must include both Key and Value.",
3143 ));
3144 }
3145 }
3146 }
3147 Ok(tags)
3148}
3149
3150fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
3151 let mut keys = Vec::new();
3152 for index in 1.. {
3153 let key_name = format!("TagKeys.member.{index}");
3154 match optional_param(req, &key_name) {
3155 Some(key) => keys.push(key),
3156 None => break,
3157 }
3158 }
3159 Ok(keys)
3160}
3161
3162fn merge_tags(existing: &mut Vec<(String, String)>, incoming: &[(String, String)]) {
3163 for (key, value) in incoming {
3164 if let Some(existing_tag) = existing.iter_mut().find(|(k, _)| k == key) {
3165 existing_tag.1 = value.clone();
3166 } else {
3167 existing.push((key.clone(), value.clone()));
3168 }
3169 }
3170}
3171
3172fn tag_xml(tag: &(String, String)) -> String {
3173 format!(
3174 "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
3175 xml_escape(&tag.0),
3176 xml_escape(&tag.1),
3177 )
3178}
3179
3180fn filter_engine_versions(
3183 versions: &[CacheEngineVersion],
3184 engine: &Option<String>,
3185 engine_version: &Option<String>,
3186 family: &Option<String>,
3187) -> Vec<CacheEngineVersion> {
3188 versions
3189 .iter()
3190 .filter(|v| engine.as_ref().is_none_or(|expected| v.engine == *expected))
3191 .filter(|v| {
3192 engine_version
3193 .as_ref()
3194 .is_none_or(|expected| v.engine_version == *expected)
3195 })
3196 .filter(|v| {
3197 family
3198 .as_ref()
3199 .is_none_or(|expected| v.cache_parameter_group_family == *expected)
3200 })
3201 .cloned()
3202 .collect()
3203}
3204
3205fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
3208 fakecloud_core::query::query_response_xml(action, ELASTICACHE_NS, inner, request_id)
3209}
3210
3211fn engine_version_xml(v: &CacheEngineVersion) -> String {
3212 format!(
3213 "<CacheEngineVersion>\
3214 <Engine>{}</Engine>\
3215 <EngineVersion>{}</EngineVersion>\
3216 <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
3217 <CacheEngineDescription>{}</CacheEngineDescription>\
3218 <CacheEngineVersionDescription>{}</CacheEngineVersionDescription>\
3219 </CacheEngineVersion>",
3220 xml_escape(&v.engine),
3221 xml_escape(&v.engine_version),
3222 xml_escape(&v.cache_parameter_group_family),
3223 xml_escape(&v.cache_engine_description),
3224 xml_escape(&v.cache_engine_version_description),
3225 )
3226}
3227
3228fn recurring_charge_xml(charge: &RecurringCharge) -> String {
3229 format!(
3230 "<RecurringCharge>\
3231 <RecurringChargeAmount>{}</RecurringChargeAmount>\
3232 <RecurringChargeFrequency>{}</RecurringChargeFrequency>\
3233 </RecurringCharge>",
3234 charge.recurring_charge_amount,
3235 xml_escape(&charge.recurring_charge_frequency),
3236 )
3237}
3238
3239fn reserved_cache_node_xml(node: &ReservedCacheNode) -> String {
3240 let recurring_charges_xml: String = node
3241 .recurring_charges
3242 .iter()
3243 .map(recurring_charge_xml)
3244 .collect();
3245
3246 format!(
3247 "<ReservedCacheNode>\
3248 <ReservedCacheNodeId>{}</ReservedCacheNodeId>\
3249 <ReservedCacheNodesOfferingId>{}</ReservedCacheNodesOfferingId>\
3250 <CacheNodeType>{}</CacheNodeType>\
3251 <StartTime>{}</StartTime>\
3252 <Duration>{}</Duration>\
3253 <FixedPrice>{}</FixedPrice>\
3254 <UsagePrice>{}</UsagePrice>\
3255 <CacheNodeCount>{}</CacheNodeCount>\
3256 <ProductDescription>{}</ProductDescription>\
3257 <OfferingType>{}</OfferingType>\
3258 <State>{}</State>\
3259 <RecurringCharges>{}</RecurringCharges>\
3260 <ReservationARN>{}</ReservationARN>\
3261 </ReservedCacheNode>",
3262 xml_escape(&node.reserved_cache_node_id),
3263 xml_escape(&node.reserved_cache_nodes_offering_id),
3264 xml_escape(&node.cache_node_type),
3265 xml_escape(&node.start_time),
3266 node.duration,
3267 node.fixed_price,
3268 node.usage_price,
3269 node.cache_node_count,
3270 xml_escape(&node.product_description),
3271 xml_escape(&node.offering_type),
3272 xml_escape(&node.state),
3273 recurring_charges_xml,
3274 xml_escape(&node.reservation_arn),
3275 )
3276}
3277
3278fn reserved_cache_nodes_offering_xml(offering: &ReservedCacheNodesOffering) -> String {
3279 let recurring_charges_xml: String = offering
3280 .recurring_charges
3281 .iter()
3282 .map(recurring_charge_xml)
3283 .collect();
3284
3285 format!(
3286 "<ReservedCacheNodesOffering>\
3287 <ReservedCacheNodesOfferingId>{}</ReservedCacheNodesOfferingId>\
3288 <CacheNodeType>{}</CacheNodeType>\
3289 <Duration>{}</Duration>\
3290 <FixedPrice>{}</FixedPrice>\
3291 <UsagePrice>{}</UsagePrice>\
3292 <ProductDescription>{}</ProductDescription>\
3293 <OfferingType>{}</OfferingType>\
3294 <RecurringCharges>{}</RecurringCharges>\
3295 </ReservedCacheNodesOffering>",
3296 xml_escape(&offering.reserved_cache_nodes_offering_id),
3297 xml_escape(&offering.cache_node_type),
3298 offering.duration,
3299 offering.fixed_price,
3300 offering.usage_price,
3301 xml_escape(&offering.product_description),
3302 xml_escape(&offering.offering_type),
3303 recurring_charges_xml,
3304 )
3305}
3306
3307fn cache_parameter_group_xml(g: &CacheParameterGroup) -> String {
3308 format!(
3309 "<CacheParameterGroup>\
3310 <CacheParameterGroupName>{}</CacheParameterGroupName>\
3311 <CacheParameterGroupFamily>{}</CacheParameterGroupFamily>\
3312 <Description>{}</Description>\
3313 <IsGlobal>{}</IsGlobal>\
3314 <ARN>{}</ARN>\
3315 </CacheParameterGroup>",
3316 xml_escape(&g.cache_parameter_group_name),
3317 xml_escape(&g.cache_parameter_group_family),
3318 xml_escape(&g.description),
3319 g.is_global,
3320 xml_escape(&g.arn),
3321 )
3322}
3323
3324fn cache_subnet_group_xml(g: &CacheSubnetGroup, region: &str) -> String {
3325 let subnets_xml: String = g
3326 .subnet_ids
3327 .iter()
3328 .enumerate()
3329 .map(|(i, id)| {
3330 let az = format!("{}{}", region, (b'a' + (i % 6) as u8) as char);
3331 format!(
3332 "<Subnet>\
3333 <SubnetIdentifier>{}</SubnetIdentifier>\
3334 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
3335 </Subnet>",
3336 xml_escape(id),
3337 xml_escape(&az),
3338 )
3339 })
3340 .collect();
3341 format!(
3342 "<CacheSubnetGroupName>{}</CacheSubnetGroupName>\
3343 <CacheSubnetGroupDescription>{}</CacheSubnetGroupDescription>\
3344 <VpcId>{}</VpcId>\
3345 <Subnets>{subnets_xml}</Subnets>\
3346 <ARN>{}</ARN>",
3347 xml_escape(&g.cache_subnet_group_name),
3348 xml_escape(&g.cache_subnet_group_description),
3349 xml_escape(&g.vpc_id),
3350 xml_escape(&g.arn),
3351 )
3352}
3353
3354fn cache_cluster_xml(cluster: &CacheCluster, show_cache_node_info: bool) -> String {
3355 let cache_subnet_group_name_xml = cluster
3356 .cache_subnet_group_name
3357 .as_ref()
3358 .map(|name| {
3359 format!(
3360 "<CacheSubnetGroupName>{}</CacheSubnetGroupName>",
3361 xml_escape(name)
3362 )
3363 })
3364 .unwrap_or_default();
3365 let replication_group_id_xml = cluster
3366 .replication_group_id
3367 .as_ref()
3368 .map(|group_id| {
3369 format!(
3370 "<ReplicationGroupId>{}</ReplicationGroupId>",
3371 xml_escape(group_id)
3372 )
3373 })
3374 .unwrap_or_default();
3375 let cache_nodes_xml = if show_cache_node_info {
3376 match usize::try_from(cluster.num_cache_nodes) {
3377 Ok(node_count) => {
3378 let members: String = (0..node_count)
3379 .filter_map(|index| {
3380 let node_id = index.checked_add(1)?;
3381 Some(cache_node_xml(cluster, node_id))
3382 })
3383 .collect();
3384 format!("<CacheNodes>{members}</CacheNodes>")
3385 }
3386 Err(_) => String::new(),
3387 }
3388 } else {
3389 String::new()
3390 };
3391
3392 format!(
3393 "<CacheClusterId>{}</CacheClusterId>\
3394 <CacheNodeType>{}</CacheNodeType>\
3395 <Engine>{}</Engine>\
3396 <EngineVersion>{}</EngineVersion>\
3397 <CacheClusterStatus>{}</CacheClusterStatus>\
3398 <NumCacheNodes>{}</NumCacheNodes>\
3399 <PreferredAvailabilityZone>{}</PreferredAvailabilityZone>\
3400 <CacheClusterCreateTime>{}</CacheClusterCreateTime>\
3401 {cache_subnet_group_name_xml}\
3402 {cache_nodes_xml}\
3403 <AutoMinorVersionUpgrade>{}</AutoMinorVersionUpgrade>\
3404 {replication_group_id_xml}\
3405 <ARN>{}</ARN>",
3406 xml_escape(&cluster.cache_cluster_id),
3407 xml_escape(&cluster.cache_node_type),
3408 xml_escape(&cluster.engine),
3409 xml_escape(&cluster.engine_version),
3410 xml_escape(&cluster.cache_cluster_status),
3411 cluster.num_cache_nodes,
3412 xml_escape(&cluster.preferred_availability_zone),
3413 xml_escape(&cluster.created_at),
3414 cluster.auto_minor_version_upgrade,
3415 xml_escape(&cluster.arn),
3416 )
3417}
3418
3419fn cache_node_xml(cluster: &CacheCluster, node_id: usize) -> String {
3420 format!(
3421 "<CacheNode>\
3422 <CacheNodeId>{node_id:04}</CacheNodeId>\
3423 <CacheNodeStatus>{}</CacheNodeStatus>\
3424 <CacheNodeCreateTime>{}</CacheNodeCreateTime>\
3425 <Endpoint><Address>{}</Address><Port>{}</Port></Endpoint>\
3426 <ParameterGroupStatus>in-sync</ParameterGroupStatus>\
3427 <CustomerAvailabilityZone>{}</CustomerAvailabilityZone>\
3428 </CacheNode>",
3429 xml_escape(&cluster.cache_cluster_status),
3430 xml_escape(&cluster.created_at),
3431 xml_escape(&cluster.endpoint_address),
3432 cluster.endpoint_port,
3433 xml_escape(&cluster.preferred_availability_zone),
3434 )
3435}
3436
3437fn replication_group_xml(g: &ReplicationGroup, region: &str) -> String {
3438 let member_clusters_xml: String = g
3439 .member_clusters
3440 .iter()
3441 .map(|c| format!("<ClusterId>{}</ClusterId>", xml_escape(c)))
3442 .collect();
3443 let global_replication_group_info_xml = g
3444 .global_replication_group_id
3445 .as_ref()
3446 .map(|global_replication_group_id| {
3447 format!(
3448 "<GlobalReplicationGroupInfo>\
3449 <GlobalReplicationGroupId>{}</GlobalReplicationGroupId>\
3450 <GlobalReplicationGroupMemberRole>{}</GlobalReplicationGroupMemberRole>\
3451 </GlobalReplicationGroupInfo>",
3452 xml_escape(global_replication_group_id),
3453 xml_escape(
3454 g.global_replication_group_role
3455 .as_deref()
3456 .unwrap_or("primary")
3457 ),
3458 )
3459 })
3460 .unwrap_or_default();
3461
3462 let primary_az = format!("{region}a");
3463
3464 format!(
3465 "<ReplicationGroupId>{}</ReplicationGroupId>\
3466 <Description>{}</Description>\
3467 {global_replication_group_info_xml}\
3468 <Status>{}</Status>\
3469 <MemberClusters>{member_clusters_xml}</MemberClusters>\
3470 <NodeGroups>\
3471 <NodeGroup>\
3472 <NodeGroupId>0001</NodeGroupId>\
3473 <Status>available</Status>\
3474 <PrimaryEndpoint>\
3475 <Address>{}</Address>\
3476 <Port>{}</Port>\
3477 </PrimaryEndpoint>\
3478 <ReaderEndpoint>\
3479 <Address>{}</Address>\
3480 <Port>{}</Port>\
3481 </ReaderEndpoint>\
3482 <NodeGroupMembers>\
3483 <NodeGroupMember>\
3484 <CacheClusterId>{}</CacheClusterId>\
3485 <CacheNodeId>0001</CacheNodeId>\
3486 <PreferredAvailabilityZone>{}</PreferredAvailabilityZone>\
3487 <CurrentRole>primary</CurrentRole>\
3488 </NodeGroupMember>\
3489 </NodeGroupMembers>\
3490 </NodeGroup>\
3491 </NodeGroups>\
3492 <AutomaticFailover>{}</AutomaticFailover>\
3493 <SnapshotRetentionLimit>{}</SnapshotRetentionLimit>\
3494 <SnapshotWindow>{}</SnapshotWindow>\
3495 <ClusterEnabled>false</ClusterEnabled>\
3496 <CacheNodeType>{}</CacheNodeType>\
3497 <TransitEncryptionEnabled>false</TransitEncryptionEnabled>\
3498 <AtRestEncryptionEnabled>false</AtRestEncryptionEnabled>\
3499 <ARN>{}</ARN>",
3500 xml_escape(&g.replication_group_id),
3501 xml_escape(&g.description),
3502 xml_escape(&g.status),
3503 xml_escape(&g.endpoint_address),
3504 g.endpoint_port,
3505 xml_escape(&g.endpoint_address),
3506 g.endpoint_port,
3507 xml_escape(g.member_clusters.first().map(|s| s.as_str()).unwrap_or("")),
3508 xml_escape(&primary_az),
3509 if g.automatic_failover_enabled {
3510 "enabled"
3511 } else {
3512 "disabled"
3513 },
3514 g.snapshot_retention_limit,
3515 xml_escape(&g.snapshot_window),
3516 xml_escape(&g.cache_node_type),
3517 xml_escape(&g.arn),
3518 )
3519}
3520
3521fn global_replication_group_id(region: &str, suffix: &str) -> String {
3522 format!("fc-{}-{}", region, suffix)
3523}
3524
3525fn primary_global_member(group: &GlobalReplicationGroup) -> Option<&GlobalReplicationGroupMember> {
3526 group.members.iter().find(|member| member.role == "primary")
3527}
3528
3529fn global_replication_group_xml(group: &GlobalReplicationGroup, show_member_info: bool) -> String {
3530 let members_xml = if show_member_info {
3531 let members_xml: String = group
3532 .members
3533 .iter()
3534 .map(global_replication_group_member_xml)
3535 .collect();
3536 format!("<Members>{members_xml}</Members>")
3537 } else {
3538 String::new()
3539 };
3540 let global_node_groups_xml = if group.cluster_enabled {
3541 "<GlobalNodeGroups><GlobalNodeGroup><GlobalNodeGroupId>0001</GlobalNodeGroupId><Slots>0-16383</Slots></GlobalNodeGroup></GlobalNodeGroups>".to_string()
3542 } else {
3543 String::new()
3544 };
3545
3546 format!(
3547 "<GlobalReplicationGroupId>{}</GlobalReplicationGroupId>\
3548 <GlobalReplicationGroupDescription>{}</GlobalReplicationGroupDescription>\
3549 <Status>{}</Status>\
3550 <CacheNodeType>{}</CacheNodeType>\
3551 <Engine>{}</Engine>\
3552 <EngineVersion>{}</EngineVersion>\
3553 {members_xml}\
3554 <ClusterEnabled>{}</ClusterEnabled>\
3555 {global_node_groups_xml}\
3556 <AuthTokenEnabled>false</AuthTokenEnabled>\
3557 <TransitEncryptionEnabled>false</TransitEncryptionEnabled>\
3558 <AtRestEncryptionEnabled>false</AtRestEncryptionEnabled>\
3559 <ARN>{}</ARN>",
3560 xml_escape(&group.global_replication_group_id),
3561 xml_escape(&group.global_replication_group_description),
3562 xml_escape(&group.status),
3563 xml_escape(&group.cache_node_type),
3564 xml_escape(&group.engine),
3565 xml_escape(&group.engine_version),
3566 group.cluster_enabled,
3567 xml_escape(&group.arn),
3568 )
3569}
3570
3571fn global_replication_group_member_xml(member: &GlobalReplicationGroupMember) -> String {
3572 format!(
3573 "<GlobalReplicationGroupMember>\
3574 <ReplicationGroupId>{}</ReplicationGroupId>\
3575 <ReplicationGroupRegion>{}</ReplicationGroupRegion>\
3576 <Role>{}</Role>\
3577 <AutomaticFailover>{}</AutomaticFailover>\
3578 <Status>{}</Status>\
3579 </GlobalReplicationGroupMember>",
3580 xml_escape(&member.replication_group_id),
3581 xml_escape(&member.replication_group_region),
3582 xml_escape(&member.role),
3583 if member.automatic_failover {
3584 "enabled"
3585 } else {
3586 "disabled"
3587 },
3588 xml_escape(&member.status),
3589 )
3590}
3591
3592fn user_xml(u: &ElastiCacheUser) -> String {
3593 let user_group_ids_xml: String = u
3594 .user_group_ids
3595 .iter()
3596 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3597 .collect();
3598 format!(
3599 "<UserId>{}</UserId>\
3600 <UserName>{}</UserName>\
3601 <Status>{}</Status>\
3602 <Engine>{}</Engine>\
3603 <MinimumEngineVersion>{}</MinimumEngineVersion>\
3604 <AccessString>{}</AccessString>\
3605 <UserGroupIds>{user_group_ids_xml}</UserGroupIds>\
3606 <Authentication>\
3607 <Type>{}</Type>\
3608 <PasswordCount>{}</PasswordCount>\
3609 </Authentication>\
3610 <ARN>{}</ARN>",
3611 xml_escape(&u.user_id),
3612 xml_escape(&u.user_name),
3613 xml_escape(&u.status),
3614 xml_escape(&u.engine),
3615 xml_escape(&u.minimum_engine_version),
3616 xml_escape(&u.access_string),
3617 xml_escape(&u.authentication_type),
3618 u.password_count,
3619 xml_escape(&u.arn),
3620 )
3621}
3622
3623fn user_group_xml(g: &ElastiCacheUserGroup) -> String {
3624 let user_ids_xml: String = g
3625 .user_ids
3626 .iter()
3627 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3628 .collect();
3629 let replication_groups_xml: String = g
3630 .replication_groups
3631 .iter()
3632 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3633 .collect();
3634 let pending_xml = if let Some(ref pc) = g.pending_changes {
3635 let to_add: String = pc
3636 .user_ids_to_add
3637 .iter()
3638 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3639 .collect();
3640 let to_remove: String = pc
3641 .user_ids_to_remove
3642 .iter()
3643 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3644 .collect();
3645 format!(
3646 "<PendingChanges>\
3647 <UserIdsToAdd>{to_add}</UserIdsToAdd>\
3648 <UserIdsToRemove>{to_remove}</UserIdsToRemove>\
3649 </PendingChanges>"
3650 )
3651 } else {
3652 String::new()
3653 };
3654 format!(
3655 "<UserGroupId>{}</UserGroupId>\
3656 <Status>{}</Status>\
3657 <Engine>{}</Engine>\
3658 <MinimumEngineVersion>{}</MinimumEngineVersion>\
3659 <UserIds>{user_ids_xml}</UserIds>\
3660 <ReplicationGroups>{replication_groups_xml}</ReplicationGroups>\
3661 {pending_xml}\
3662 <ARN>{}</ARN>",
3663 xml_escape(&g.user_group_id),
3664 xml_escape(&g.status),
3665 xml_escape(&g.engine),
3666 xml_escape(&g.minimum_engine_version),
3667 xml_escape(&g.arn),
3668 )
3669}
3670
3671fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3672 match error {
3673 RuntimeError::Unavailable => AwsServiceError::aws_error(
3674 StatusCode::SERVICE_UNAVAILABLE,
3675 "InvalidParameterValue",
3676 "Docker/Podman is required for ElastiCache replication groups but is not available"
3677 .to_string(),
3678 ),
3679 RuntimeError::ContainerStartFailed(msg) => AwsServiceError::aws_error(
3680 StatusCode::INTERNAL_SERVER_ERROR,
3681 "InvalidParameterValue",
3682 format!("Failed to start Redis container: {msg}"),
3683 ),
3684 }
3685}
3686
3687fn add_cluster_to_replication_group(
3688 state: &mut ElastiCacheState,
3689 replication_group_id: &str,
3690 cache_cluster_id: &str,
3691) {
3692 if let Some(group) = state.replication_groups.get_mut(replication_group_id) {
3693 if !group
3694 .member_clusters
3695 .iter()
3696 .any(|id| id == cache_cluster_id)
3697 {
3698 group.member_clusters.push(cache_cluster_id.to_string());
3699 group.num_cache_clusters = group.member_clusters.len() as i32;
3700 }
3701 }
3702}
3703
3704fn remove_cluster_from_replication_group(
3705 state: &mut ElastiCacheState,
3706 replication_group_id: &str,
3707 cache_cluster_id: &str,
3708) {
3709 if let Some(group) = state.replication_groups.get_mut(replication_group_id) {
3710 let original_len = group.member_clusters.len();
3711 group.member_clusters.retain(|id| id != cache_cluster_id);
3712 if group.member_clusters.len() != original_len {
3713 group.num_cache_clusters = group.member_clusters.len() as i32;
3714 }
3715 }
3716}
3717
3718fn snapshot_xml(s: &CacheSnapshot) -> String {
3719 format!(
3720 "<SnapshotName>{}</SnapshotName>\
3721 <ReplicationGroupId>{}</ReplicationGroupId>\
3722 <ReplicationGroupDescription>{}</ReplicationGroupDescription>\
3723 <SnapshotStatus>{}</SnapshotStatus>\
3724 <SnapshotSource>{}</SnapshotSource>\
3725 <CacheNodeType>{}</CacheNodeType>\
3726 <Engine>{}</Engine>\
3727 <EngineVersion>{}</EngineVersion>\
3728 <NumCacheClusters>{}</NumCacheClusters>\
3729 <ARN>{}</ARN>",
3730 xml_escape(&s.snapshot_name),
3731 xml_escape(&s.replication_group_id),
3732 xml_escape(&s.replication_group_description),
3733 xml_escape(&s.snapshot_status),
3734 xml_escape(&s.snapshot_source),
3735 xml_escape(&s.cache_node_type),
3736 xml_escape(&s.engine),
3737 xml_escape(&s.engine_version),
3738 s.num_cache_clusters,
3739 xml_escape(&s.arn),
3740 )
3741}
3742
3743fn serverless_cache_xml(cache: &ServerlessCache) -> String {
3744 let cache_usage_limits_xml = cache
3745 .cache_usage_limits
3746 .as_ref()
3747 .map(serverless_cache_usage_limits_xml)
3748 .unwrap_or_default();
3749 let kms_key_id_xml = cache
3750 .kms_key_id
3751 .as_ref()
3752 .map(|value| format!("<KmsKeyId>{}</KmsKeyId>", xml_escape(value)))
3753 .unwrap_or_default();
3754 let security_group_ids_xml = if cache.security_group_ids.is_empty() {
3755 String::new()
3756 } else {
3757 let members: String = cache
3758 .security_group_ids
3759 .iter()
3760 .map(|id| format!("<SecurityGroupId>{}</SecurityGroupId>", xml_escape(id)))
3761 .collect();
3762 format!("<SecurityGroupIds>{members}</SecurityGroupIds>")
3763 };
3764 let subnet_ids_xml = if cache.subnet_ids.is_empty() {
3765 String::new()
3766 } else {
3767 let members: String = cache
3768 .subnet_ids
3769 .iter()
3770 .map(|id| format!("<member>{}</member>", xml_escape(id)))
3771 .collect();
3772 format!("<SubnetIds>{members}</SubnetIds>")
3773 };
3774 let user_group_id_xml = cache
3775 .user_group_id
3776 .as_ref()
3777 .map(|value| format!("<UserGroupId>{}</UserGroupId>", xml_escape(value)))
3778 .unwrap_or_default();
3779 let snapshot_retention_limit_xml = cache
3780 .snapshot_retention_limit
3781 .map(|value| format!("<SnapshotRetentionLimit>{value}</SnapshotRetentionLimit>"))
3782 .unwrap_or_default();
3783 let daily_snapshot_time_xml = cache
3784 .daily_snapshot_time
3785 .as_ref()
3786 .map(|value| {
3787 format!(
3788 "<DailySnapshotTime>{}</DailySnapshotTime>",
3789 xml_escape(value)
3790 )
3791 })
3792 .unwrap_or_default();
3793
3794 format!(
3795 "<ServerlessCacheName>{}</ServerlessCacheName>\
3796 <Description>{}</Description>\
3797 <CreateTime>{}</CreateTime>\
3798 <Status>{}</Status>\
3799 <Engine>{}</Engine>\
3800 <MajorEngineVersion>{}</MajorEngineVersion>\
3801 <FullEngineVersion>{}</FullEngineVersion>\
3802 {cache_usage_limits_xml}\
3803 {kms_key_id_xml}\
3804 {security_group_ids_xml}\
3805 <Endpoint>{}</Endpoint>\
3806 <ReaderEndpoint>{}</ReaderEndpoint>\
3807 <ARN>{}</ARN>\
3808 {user_group_id_xml}\
3809 {subnet_ids_xml}\
3810 {snapshot_retention_limit_xml}\
3811 {daily_snapshot_time_xml}",
3812 xml_escape(&cache.serverless_cache_name),
3813 xml_escape(&cache.description),
3814 xml_escape(&cache.created_at),
3815 xml_escape(&cache.status),
3816 xml_escape(&cache.engine),
3817 xml_escape(&cache.major_engine_version),
3818 xml_escape(&cache.full_engine_version),
3819 serverless_cache_endpoint_xml(&cache.endpoint),
3820 serverless_cache_endpoint_xml(&cache.reader_endpoint),
3821 xml_escape(&cache.arn),
3822 )
3823}
3824
3825fn serverless_cache_usage_limits_xml(limits: &ServerlessCacheUsageLimits) -> String {
3826 let data_storage_xml = limits
3827 .data_storage
3828 .as_ref()
3829 .map(|data_storage| {
3830 let maximum_xml = data_storage
3831 .maximum
3832 .map(|value| format!("<Maximum>{value}</Maximum>"))
3833 .unwrap_or_default();
3834 let minimum_xml = data_storage
3835 .minimum
3836 .map(|value| format!("<Minimum>{value}</Minimum>"))
3837 .unwrap_or_default();
3838 let unit_xml = data_storage
3839 .unit
3840 .as_ref()
3841 .map(|value| format!("<Unit>{}</Unit>", xml_escape(value)))
3842 .unwrap_or_default();
3843 format!("<DataStorage>{maximum_xml}{minimum_xml}{unit_xml}</DataStorage>")
3844 })
3845 .unwrap_or_default();
3846 let ecpu_per_second_xml = limits
3847 .ecpu_per_second
3848 .as_ref()
3849 .map(|ecpu| {
3850 let maximum_xml = ecpu
3851 .maximum
3852 .map(|value| format!("<Maximum>{value}</Maximum>"))
3853 .unwrap_or_default();
3854 let minimum_xml = ecpu
3855 .minimum
3856 .map(|value| format!("<Minimum>{value}</Minimum>"))
3857 .unwrap_or_default();
3858 format!("<ECPUPerSecond>{maximum_xml}{minimum_xml}</ECPUPerSecond>")
3859 })
3860 .unwrap_or_default();
3861
3862 format!("<CacheUsageLimits>{data_storage_xml}{ecpu_per_second_xml}</CacheUsageLimits>")
3863}
3864
3865fn serverless_cache_endpoint_xml(endpoint: &ServerlessCacheEndpoint) -> String {
3866 format!(
3867 "<Address>{}</Address><Port>{}</Port>",
3868 xml_escape(&endpoint.address),
3869 endpoint.port,
3870 )
3871}
3872
3873fn serverless_cache_snapshot_xml(snapshot: &ServerlessCacheSnapshot) -> String {
3874 let kms_key_id_xml = snapshot
3875 .kms_key_id
3876 .as_ref()
3877 .map(|value| format!("<KmsKeyId>{}</KmsKeyId>", xml_escape(value)))
3878 .unwrap_or_default();
3879 let expiry_time_xml = snapshot
3880 .expiry_time
3881 .as_ref()
3882 .map(|value| format!("<ExpiryTime>{}</ExpiryTime>", xml_escape(value)))
3883 .unwrap_or_default();
3884 let bytes_used_for_cache_xml = snapshot
3885 .bytes_used_for_cache
3886 .as_ref()
3887 .map(|value| {
3888 format!(
3889 "<BytesUsedForCache>{}</BytesUsedForCache>",
3890 xml_escape(value)
3891 )
3892 })
3893 .unwrap_or_default();
3894
3895 format!(
3896 "<ServerlessCacheSnapshotName>{}</ServerlessCacheSnapshotName>\
3897 <ARN>{}</ARN>\
3898 {kms_key_id_xml}\
3899 <SnapshotType>{}</SnapshotType>\
3900 <Status>{}</Status>\
3901 <CreateTime>{}</CreateTime>\
3902 {expiry_time_xml}\
3903 {bytes_used_for_cache_xml}\
3904 <ServerlessCacheConfiguration>\
3905 <ServerlessCacheName>{}</ServerlessCacheName>\
3906 <Engine>{}</Engine>\
3907 <MajorEngineVersion>{}</MajorEngineVersion>\
3908 </ServerlessCacheConfiguration>",
3909 xml_escape(&snapshot.serverless_cache_snapshot_name),
3910 xml_escape(&snapshot.arn),
3911 xml_escape(&snapshot.snapshot_type),
3912 xml_escape(&snapshot.status),
3913 xml_escape(&snapshot.create_time),
3914 xml_escape(&snapshot.serverless_cache_name),
3915 xml_escape(&snapshot.engine),
3916 xml_escape(&snapshot.major_engine_version),
3917 )
3918}
3919
3920fn parameter_xml(p: &EngineDefaultParameter) -> String {
3921 format!(
3922 "<Parameter>\
3923 <ParameterName>{}</ParameterName>\
3924 <ParameterValue>{}</ParameterValue>\
3925 <Description>{}</Description>\
3926 <Source>{}</Source>\
3927 <DataType>{}</DataType>\
3928 <AllowedValues>{}</AllowedValues>\
3929 <IsModifiable>{}</IsModifiable>\
3930 <MinimumEngineVersion>{}</MinimumEngineVersion>\
3931 </Parameter>",
3932 xml_escape(&p.parameter_name),
3933 xml_escape(&p.parameter_value),
3934 xml_escape(&p.description),
3935 xml_escape(&p.source),
3936 xml_escape(&p.data_type),
3937 xml_escape(&p.allowed_values),
3938 p.is_modifiable,
3939 xml_escape(&p.minimum_engine_version),
3940 )
3941}
3942
3943#[cfg(test)]
3944mod tests {
3945 use super::*;
3946 use crate::state::default_engine_versions;
3947 use bytes::Bytes;
3948 use http::{HeaderMap, Method};
3949 use std::collections::HashMap;
3950
3951 fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3952 let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3953 for (key, value) in params {
3954 query_params.insert((*key).to_string(), (*value).to_string());
3955 }
3956
3957 AwsRequest {
3958 service: "elasticache".to_string(),
3959 action: action.to_string(),
3960 region: "us-east-1".to_string(),
3961 account_id: "123456789012".to_string(),
3962 request_id: "test-request-id".to_string(),
3963 headers: HeaderMap::new(),
3964 query_params,
3965 body: Bytes::new(),
3966 path_segments: vec![],
3967 raw_path: "/".to_string(),
3968 raw_query: String::new(),
3969 method: http::Method::POST,
3970 is_query_protocol: true,
3971 access_key_id: None,
3972 principal: None,
3973 }
3974 }
3975
3976 fn sample_reserved_cache_node_offering(id: &str) -> ReservedCacheNodesOffering {
3977 ReservedCacheNodesOffering {
3978 reserved_cache_nodes_offering_id: id.to_string(),
3979 cache_node_type: "cache.t3.micro".to_string(),
3980 duration: 31_536_000,
3981 fixed_price: 0.0,
3982 usage_price: 0.011,
3983 product_description: "redis".to_string(),
3984 offering_type: "No Upfront".to_string(),
3985 recurring_charges: Vec::new(),
3986 }
3987 }
3988
3989 fn sample_reserved_cache_node(id: &str, offering_id: &str) -> ReservedCacheNode {
3990 ReservedCacheNode {
3991 reserved_cache_node_id: id.to_string(),
3992 reserved_cache_nodes_offering_id: offering_id.to_string(),
3993 cache_node_type: "cache.t3.micro".to_string(),
3994 start_time: "2024-01-01T00:00:00Z".to_string(),
3995 duration: 31_536_000,
3996 fixed_price: 0.0,
3997 usage_price: 0.011,
3998 cache_node_count: 1,
3999 product_description: "redis".to_string(),
4000 offering_type: "No Upfront".to_string(),
4001 state: "payment-pending".to_string(),
4002 recurring_charges: Vec::new(),
4003 reservation_arn: "arn:aws:elasticache:us-east-1:123456789012:reserved-instance:test"
4004 .to_string(),
4005 }
4006 }
4007
4008 #[test]
4009 fn parse_member_list_extracts_indexed_values() {
4010 let mut params = HashMap::new();
4011 params.insert(
4012 "SubnetIds.SubnetIdentifier.1".to_string(),
4013 "subnet-aaa".to_string(),
4014 );
4015 params.insert(
4016 "SubnetIds.SubnetIdentifier.2".to_string(),
4017 "subnet-bbb".to_string(),
4018 );
4019 params.insert(
4020 "SubnetIds.SubnetIdentifier.3".to_string(),
4021 "subnet-ccc".to_string(),
4022 );
4023 params.insert("OtherParam".to_string(), "ignored".to_string());
4024
4025 let result = parse_member_list(¶ms, "SubnetIds", "SubnetIdentifier");
4026 assert_eq!(result, vec!["subnet-aaa", "subnet-bbb", "subnet-ccc"]);
4027 }
4028
4029 #[test]
4030 fn parse_member_list_returns_sorted_by_index() {
4031 let mut params = HashMap::new();
4032 params.insert(
4033 "SubnetIds.SubnetIdentifier.3".to_string(),
4034 "subnet-ccc".to_string(),
4035 );
4036 params.insert(
4037 "SubnetIds.SubnetIdentifier.1".to_string(),
4038 "subnet-aaa".to_string(),
4039 );
4040
4041 let result = parse_member_list(¶ms, "SubnetIds", "SubnetIdentifier");
4042 assert_eq!(result, vec!["subnet-aaa", "subnet-ccc"]);
4043 }
4044
4045 #[test]
4046 fn parse_member_list_returns_empty_for_no_matches() {
4047 let params = HashMap::new();
4048 let result = parse_member_list(¶ms, "SubnetIds", "SubnetIdentifier");
4049 assert!(result.is_empty());
4050 }
4051
4052 #[test]
4053 fn cache_subnet_group_xml_contains_all_fields() {
4054 let group = CacheSubnetGroup {
4055 cache_subnet_group_name: "my-group".to_string(),
4056 cache_subnet_group_description: "My description".to_string(),
4057 vpc_id: "vpc-123".to_string(),
4058 subnet_ids: vec!["subnet-aaa".to_string(), "subnet-bbb".to_string()],
4059 arn: "arn:aws:elasticache:us-east-1:123:subnetgroup:my-group".to_string(),
4060 };
4061 let xml = cache_subnet_group_xml(&group, "us-east-1");
4062 assert!(xml.contains("<CacheSubnetGroupName>my-group</CacheSubnetGroupName>"));
4063 assert!(xml
4064 .contains("<CacheSubnetGroupDescription>My description</CacheSubnetGroupDescription>"));
4065 assert!(xml.contains("<VpcId>vpc-123</VpcId>"));
4066 assert!(xml.contains("<SubnetIdentifier>subnet-aaa</SubnetIdentifier>"));
4067 assert!(xml.contains("<SubnetIdentifier>subnet-bbb</SubnetIdentifier>"));
4068 assert!(xml.contains("<Name>us-east-1a</Name>"));
4069 assert!(xml.contains("<Name>us-east-1b</Name>"));
4070 assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:subnetgroup:my-group</ARN>"));
4071 }
4072
4073 #[test]
4074 fn cache_cluster_xml_contains_expected_fields() {
4075 let cluster = CacheCluster {
4076 cache_cluster_id: "classic-1".to_string(),
4077 cache_node_type: "cache.t3.micro".to_string(),
4078 engine: "redis".to_string(),
4079 engine_version: "7.1".to_string(),
4080 cache_cluster_status: "available".to_string(),
4081 num_cache_nodes: 2,
4082 preferred_availability_zone: "us-east-1a".to_string(),
4083 cache_subnet_group_name: Some("default".to_string()),
4084 auto_minor_version_upgrade: true,
4085 arn: "arn:aws:elasticache:us-east-1:123:cluster:classic-1".to_string(),
4086 created_at: "2024-01-01T00:00:00Z".to_string(),
4087 endpoint_address: "127.0.0.1".to_string(),
4088 endpoint_port: 6379,
4089 container_id: "abc123".to_string(),
4090 host_port: 6379,
4091 replication_group_id: Some("rg-1".to_string()),
4092 };
4093
4094 let xml = cache_cluster_xml(&cluster, true);
4095 assert!(xml.contains("<CacheClusterId>classic-1</CacheClusterId>"));
4096 assert!(xml.contains("<CacheNodeType>cache.t3.micro</CacheNodeType>"));
4097 assert!(xml.contains("<Engine>redis</Engine>"));
4098 assert!(xml.contains("<NumCacheNodes>2</NumCacheNodes>"));
4099 assert!(xml.contains("<PreferredAvailabilityZone>us-east-1a</PreferredAvailabilityZone>"));
4100 assert!(xml.contains("<CacheSubnetGroupName>default</CacheSubnetGroupName>"));
4101 assert!(xml.contains("<CacheNodes>"));
4102 assert!(xml.contains("<CacheNodeId>0001</CacheNodeId>"));
4103 assert!(xml.contains("<ReplicationGroupId>rg-1</ReplicationGroupId>"));
4104 assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:cluster:classic-1</ARN>"));
4105 }
4106
4107 #[test]
4108 fn filter_engine_versions_by_engine() {
4109 let versions = default_engine_versions();
4110 let filtered = filter_engine_versions(&versions, &Some("redis".to_string()), &None, &None);
4111 assert_eq!(filtered.len(), 1);
4112 assert_eq!(filtered[0].engine, "redis");
4113 }
4114
4115 #[test]
4116 fn filter_engine_versions_by_family() {
4117 let versions = default_engine_versions();
4118 let filtered =
4119 filter_engine_versions(&versions, &None, &None, &Some("valkey8".to_string()));
4120 assert_eq!(filtered.len(), 1);
4121 assert_eq!(filtered[0].engine, "valkey");
4122 }
4123
4124 #[test]
4125 fn filter_engine_versions_no_match() {
4126 let versions = default_engine_versions();
4127 let filtered =
4128 filter_engine_versions(&versions, &Some("memcached".to_string()), &None, &None);
4129 assert!(filtered.is_empty());
4130 }
4131
4132 #[test]
4133 fn paginate_returns_all_when_within_limit() {
4134 let items = vec![1, 2, 3];
4135 let (page, marker) = paginate(&items, None, None);
4136 assert_eq!(page, vec![1, 2, 3]);
4137 assert!(marker.is_none());
4138 }
4139
4140 #[test]
4141 fn paginate_respects_max_records() {
4142 let items = vec![1, 2, 3, 4, 5];
4143 let (page, marker) = paginate(&items, None, Some(2));
4144 assert_eq!(page, vec![1, 2]);
4145 assert_eq!(marker, Some("2".to_string()));
4146
4147 let (page2, marker2) = paginate(&items, Some("2"), Some(2));
4148 assert_eq!(page2, vec![3, 4]);
4149 assert_eq!(marker2, Some("4".to_string()));
4150
4151 let (page3, marker3) = paginate(&items, Some("4"), Some(2));
4152 assert_eq!(page3, vec![5]);
4153 assert!(marker3.is_none());
4154 }
4155
4156 #[test]
4157 fn parse_reserved_duration_filter_accepts_years_and_seconds() {
4158 assert_eq!(
4159 parse_reserved_duration_filter(Some("1".to_string())).unwrap(),
4160 Some(31_536_000)
4161 );
4162 assert_eq!(
4163 parse_reserved_duration_filter(Some("94608000".to_string())).unwrap(),
4164 Some(94_608_000)
4165 );
4166 }
4167
4168 #[test]
4169 fn parse_reserved_duration_filter_rejects_invalid_value() {
4170 assert!(parse_reserved_duration_filter(Some("2".to_string())).is_err());
4171 }
4172
4173 #[test]
4174 fn xml_wrap_produces_valid_response() {
4175 let xml = xml_wrap("TestAction", "<Data>ok</Data>", "req-123");
4176 assert!(xml.contains("<TestActionResponse"));
4177 assert!(xml.contains("<TestActionResult>"));
4178 assert!(xml.contains("<RequestId>req-123</RequestId>"));
4179 assert!(xml.contains(ELASTICACHE_NS));
4180 }
4181
4182 #[test]
4183 fn parse_tags_reads_query_shape() {
4184 let req = request(
4185 "AddTagsToResource",
4186 &[
4187 ("Tags.Tag.1.Key", "env"),
4188 ("Tags.Tag.1.Value", "prod"),
4189 ("Tags.Tag.2.Key", "team"),
4190 ("Tags.Tag.2.Value", "backend"),
4191 ],
4192 );
4193
4194 let tags = parse_tags(&req).expect("tags");
4195 assert_eq!(
4196 tags,
4197 vec![
4198 ("env".to_string(), "prod".to_string()),
4199 ("team".to_string(), "backend".to_string()),
4200 ]
4201 );
4202 }
4203
4204 #[test]
4205 fn parse_tags_returns_empty_for_no_tags() {
4206 let req = request("AddTagsToResource", &[]);
4207 let tags = parse_tags(&req).expect("tags");
4208 assert!(tags.is_empty());
4209 }
4210
4211 #[test]
4212 fn parse_tag_keys_reads_member_shape() {
4213 let req = request(
4214 "RemoveTagsFromResource",
4215 &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
4216 );
4217
4218 let keys = parse_tag_keys(&req).expect("tag keys");
4219 assert_eq!(keys, vec!["env".to_string(), "team".to_string()]);
4220 }
4221
4222 #[test]
4223 fn merge_tags_adds_new_and_updates_existing() {
4224 let mut tags = vec![("env".to_string(), "dev".to_string())];
4225
4226 merge_tags(
4227 &mut tags,
4228 &[
4229 ("env".to_string(), "prod".to_string()),
4230 ("team".to_string(), "core".to_string()),
4231 ],
4232 );
4233
4234 assert_eq!(
4235 tags,
4236 vec![
4237 ("env".to_string(), "prod".to_string()),
4238 ("team".to_string(), "core".to_string()),
4239 ]
4240 );
4241 }
4242
4243 #[test]
4244 fn tag_xml_produces_valid_element() {
4245 let xml = tag_xml(&("env".to_string(), "prod".to_string()));
4246 assert_eq!(xml, "<Tag><Key>env</Key><Value>prod</Value></Tag>");
4247 }
4248
4249 #[test]
4250 fn reserved_cache_nodes_offering_xml_contains_expected_fields() {
4251 let xml = reserved_cache_nodes_offering_xml(&ReservedCacheNodesOffering {
4252 reserved_cache_nodes_offering_id: "offering-a".to_string(),
4253 cache_node_type: "cache.r6g.large".to_string(),
4254 duration: 94_608_000,
4255 fixed_price: 1550.0,
4256 usage_price: 0.0,
4257 product_description: "redis".to_string(),
4258 offering_type: "All Upfront".to_string(),
4259 recurring_charges: vec![RecurringCharge {
4260 recurring_charge_amount: 0.0,
4261 recurring_charge_frequency: "Hourly".to_string(),
4262 }],
4263 });
4264 assert!(
4265 xml.contains("<ReservedCacheNodesOfferingId>offering-a</ReservedCacheNodesOfferingId>")
4266 );
4267 assert!(xml.contains("<CacheNodeType>cache.r6g.large</CacheNodeType>"));
4268 assert!(xml.contains("<Duration>94608000</Duration>"));
4269 assert!(xml.contains("<OfferingType>All Upfront</OfferingType>"));
4270 assert!(xml.contains("<RecurringChargeFrequency>Hourly</RecurringChargeFrequency>"));
4271 }
4272
4273 #[test]
4274 fn reserved_cache_node_xml_contains_expected_fields() {
4275 let xml = reserved_cache_node_xml(&sample_reserved_cache_node("rcn-a", "offering-a"));
4276 assert!(xml.contains("<ReservedCacheNodeId>rcn-a</ReservedCacheNodeId>"));
4277 assert!(
4278 xml.contains("<ReservedCacheNodesOfferingId>offering-a</ReservedCacheNodesOfferingId>")
4279 );
4280 assert!(xml.contains("<StartTime>2024-01-01T00:00:00Z</StartTime>"));
4281 assert!(xml.contains("<State>payment-pending</State>"));
4282 assert!(xml.contains("<ReservationARN>"));
4283 }
4284
4285 #[test]
4286 fn user_xml_contains_all_fields() {
4287 let user = ElastiCacheUser {
4288 user_id: "myuser".to_string(),
4289 user_name: "myuser".to_string(),
4290 engine: "redis".to_string(),
4291 access_string: "on ~* +@all".to_string(),
4292 status: "active".to_string(),
4293 authentication_type: "password".to_string(),
4294 password_count: 1,
4295 arn: "arn:aws:elasticache:us-east-1:123:user:myuser".to_string(),
4296 minimum_engine_version: "6.0".to_string(),
4297 user_group_ids: vec!["group1".to_string()],
4298 };
4299 let xml = user_xml(&user);
4300 assert!(xml.contains("<UserId>myuser</UserId>"));
4301 assert!(xml.contains("<UserName>myuser</UserName>"));
4302 assert!(xml.contains("<Engine>redis</Engine>"));
4303 assert!(xml.contains("<AccessString>on ~* +@all</AccessString>"));
4304 assert!(xml.contains("<Status>active</Status>"));
4305 assert!(xml.contains("<Type>password</Type>"));
4306 assert!(xml.contains("<PasswordCount>1</PasswordCount>"));
4307 assert!(xml.contains("<member>group1</member>"));
4308 assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:user:myuser</ARN>"));
4309 }
4310
4311 #[test]
4312 fn user_group_xml_contains_all_fields() {
4313 let group = ElastiCacheUserGroup {
4314 user_group_id: "mygroup".to_string(),
4315 engine: "redis".to_string(),
4316 status: "active".to_string(),
4317 user_ids: vec!["default".to_string(), "myuser".to_string()],
4318 arn: "arn:aws:elasticache:us-east-1:123:usergroup:mygroup".to_string(),
4319 minimum_engine_version: "6.0".to_string(),
4320 pending_changes: None,
4321 replication_groups: Vec::new(),
4322 };
4323 let xml = user_group_xml(&group);
4324 assert!(xml.contains("<UserGroupId>mygroup</UserGroupId>"));
4325 assert!(xml.contains("<Engine>redis</Engine>"));
4326 assert!(xml.contains("<Status>active</Status>"));
4327 assert!(xml.contains("<member>default</member>"));
4328 assert!(xml.contains("<member>myuser</member>"));
4329 assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:usergroup:mygroup</ARN>"));
4330 }
4331
4332 #[test]
4333 fn create_user_returns_user_xml() {
4334 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4335 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4336 let service = ElastiCacheService::new(shared);
4337
4338 let req = request(
4339 "CreateUser",
4340 &[
4341 ("UserId", "testuser"),
4342 ("UserName", "testuser"),
4343 ("Engine", "redis"),
4344 ("AccessString", "on ~* +@all"),
4345 ],
4346 );
4347 let resp = service.create_user(&req).unwrap();
4348 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4349 assert!(body.contains("<UserId>testuser</UserId>"));
4350 assert!(body.contains("<Status>active</Status>"));
4351 assert!(body.contains("<CreateUserResponse"));
4352 }
4353
4354 #[test]
4355 fn create_user_rejects_duplicate() {
4356 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4357 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4358 let service = ElastiCacheService::new(shared);
4359
4360 let req = request(
4361 "CreateUser",
4362 &[
4363 ("UserId", "default"),
4364 ("UserName", "default"),
4365 ("Engine", "redis"),
4366 ("AccessString", "on ~* +@all"),
4367 ],
4368 );
4369 assert!(service.create_user(&req).is_err());
4370 }
4371
4372 #[test]
4373 fn delete_user_rejects_default() {
4374 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4375 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4376 let service = ElastiCacheService::new(shared);
4377
4378 let req = request("DeleteUser", &[("UserId", "default")]);
4379 assert!(service.delete_user(&req).is_err());
4380 }
4381
4382 #[test]
4383 fn describe_users_returns_default_user() {
4384 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4385 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4386 let service = ElastiCacheService::new(shared);
4387
4388 let req = request("DescribeUsers", &[]);
4389 let resp = service.describe_users(&req).unwrap();
4390 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4391 assert!(body.contains("<UserId>default</UserId>"));
4392 }
4393
4394 #[test]
4395 fn describe_reserved_cache_nodes_returns_empty_list_by_default() {
4396 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4397 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4398 let service = ElastiCacheService::new(shared);
4399
4400 let resp = service
4401 .describe_reserved_cache_nodes(&request("DescribeReservedCacheNodes", &[]))
4402 .unwrap();
4403 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4404 assert!(body.contains("<ReservedCacheNodes></ReservedCacheNodes>"));
4405 }
4406
4407 #[test]
4408 fn describe_reserved_cache_nodes_filters_by_offering_id() {
4409 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4410 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4411 let service = ElastiCacheService::new(shared);
4412 {
4413 let mut state = service.state.write();
4414 state.reserved_cache_nodes.insert(
4415 "rcn-a".to_string(),
4416 sample_reserved_cache_node("rcn-a", "offering-a"),
4417 );
4418 state.reserved_cache_nodes.insert(
4419 "rcn-b".to_string(),
4420 sample_reserved_cache_node("rcn-b", "offering-b"),
4421 );
4422 }
4423
4424 let resp = service
4425 .describe_reserved_cache_nodes(&request(
4426 "DescribeReservedCacheNodes",
4427 &[("ReservedCacheNodesOfferingId", "offering-b")],
4428 ))
4429 .unwrap();
4430 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4431 assert!(body.contains("<ReservedCacheNodeId>rcn-b</ReservedCacheNodeId>"));
4432 assert!(!body.contains("<ReservedCacheNodeId>rcn-a</ReservedCacheNodeId>"));
4433 }
4434
4435 #[test]
4436 fn describe_reserved_cache_nodes_not_found_by_id() {
4437 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4438 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4439 let service = ElastiCacheService::new(shared);
4440
4441 assert!(service
4442 .describe_reserved_cache_nodes(&request(
4443 "DescribeReservedCacheNodes",
4444 &[("ReservedCacheNodeId", "missing")],
4445 ))
4446 .is_err());
4447 }
4448
4449 #[test]
4450 fn describe_reserved_cache_nodes_offerings_filters_and_paginates() {
4451 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4452 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4453 let service = ElastiCacheService::new(shared);
4454 {
4455 let mut state = service.state.write();
4456 state.reserved_cache_nodes_offerings = vec![
4457 sample_reserved_cache_node_offering("offering-a"),
4458 ReservedCacheNodesOffering {
4459 reserved_cache_nodes_offering_id: "offering-b".to_string(),
4460 cache_node_type: "cache.m5.large".to_string(),
4461 duration: 94_608_000,
4462 fixed_price: 0.0,
4463 usage_price: 0.033,
4464 product_description: "memcached".to_string(),
4465 offering_type: "No Upfront".to_string(),
4466 recurring_charges: Vec::new(),
4467 },
4468 ReservedCacheNodesOffering {
4469 reserved_cache_nodes_offering_id: "offering-c".to_string(),
4470 cache_node_type: "cache.r6g.large".to_string(),
4471 duration: 94_608_000,
4472 fixed_price: 1_550.0,
4473 usage_price: 0.0,
4474 product_description: "redis".to_string(),
4475 offering_type: "All Upfront".to_string(),
4476 recurring_charges: vec![RecurringCharge {
4477 recurring_charge_amount: 0.0,
4478 recurring_charge_frequency: "Hourly".to_string(),
4479 }],
4480 },
4481 ];
4482 }
4483
4484 let filtered = service
4485 .describe_reserved_cache_nodes_offerings(&request(
4486 "DescribeReservedCacheNodesOfferings",
4487 &[("ProductDescription", "redis"), ("Duration", "3")],
4488 ))
4489 .unwrap();
4490 let filtered_body = String::from_utf8(filtered.body.expect_bytes().to_vec()).unwrap();
4491 assert!(filtered_body
4492 .contains("<ReservedCacheNodesOfferingId>offering-c</ReservedCacheNodesOfferingId>"));
4493 assert!(!filtered_body
4494 .contains("<ReservedCacheNodesOfferingId>offering-b</ReservedCacheNodesOfferingId>"));
4495
4496 let paged = service
4497 .describe_reserved_cache_nodes_offerings(&request(
4498 "DescribeReservedCacheNodesOfferings",
4499 &[("MaxRecords", "1")],
4500 ))
4501 .unwrap();
4502 let paged_body = String::from_utf8(paged.body.expect_bytes().to_vec()).unwrap();
4503 assert!(paged_body.contains("<Marker>1</Marker>"));
4504 }
4505
4506 #[test]
4507 fn describe_reserved_cache_nodes_offerings_not_found_by_id() {
4508 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4509 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4510 let service = ElastiCacheService::new(shared);
4511
4512 assert!(service
4513 .describe_reserved_cache_nodes_offerings(&request(
4514 "DescribeReservedCacheNodesOfferings",
4515 &[("ReservedCacheNodesOfferingId", "missing")],
4516 ))
4517 .is_err());
4518 }
4519
4520 #[test]
4521 fn create_and_describe_user_group() {
4522 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4523 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4524 let service = ElastiCacheService::new(shared);
4525
4526 let req = request(
4527 "CreateUserGroup",
4528 &[
4529 ("UserGroupId", "mygroup"),
4530 ("Engine", "redis"),
4531 ("UserIds.member.1", "default"),
4532 ],
4533 );
4534 let resp = service.create_user_group(&req).unwrap();
4535 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4536 assert!(body.contains("<UserGroupId>mygroup</UserGroupId>"));
4537 assert!(body.contains("<member>default</member>"));
4538
4539 let req = request("DescribeUserGroups", &[]);
4540 let resp = service.describe_user_groups(&req).unwrap();
4541 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4542 assert!(body.contains("<UserGroupId>mygroup</UserGroupId>"));
4543 }
4544
4545 #[test]
4546 fn create_user_group_rejects_unknown_user() {
4547 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4548 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4549 let service = ElastiCacheService::new(shared);
4550
4551 let req = request(
4552 "CreateUserGroup",
4553 &[
4554 ("UserGroupId", "mygroup"),
4555 ("Engine", "redis"),
4556 ("UserIds.member.1", "nonexistent"),
4557 ],
4558 );
4559 assert!(service.create_user_group(&req).is_err());
4560 }
4561
4562 #[test]
4563 fn delete_user_group_removes_from_state() {
4564 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4565 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4566 let service = ElastiCacheService::new(shared);
4567
4568 let req = request(
4569 "CreateUserGroup",
4570 &[("UserGroupId", "delgroup"), ("Engine", "redis")],
4571 );
4572 service.create_user_group(&req).unwrap();
4573
4574 let req = request("DeleteUserGroup", &[("UserGroupId", "delgroup")]);
4575 let resp = service.delete_user_group(&req).unwrap();
4576 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4577 assert!(body.contains("<Status>deleting</Status>"));
4578
4579 let req = request("DescribeUserGroups", &[("UserGroupId", "delgroup")]);
4580 assert!(service.describe_user_groups(&req).is_err());
4581 }
4582
4583 fn service_with_cache_cluster(cluster_id: &str) -> ElastiCacheService {
4584 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4585 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4586 {
4587 let mut s = shared.write();
4588 let arn = format!("arn:aws:elasticache:us-east-1:123456789012:cluster:{cluster_id}");
4589 s.tags.insert(arn.clone(), Vec::new());
4590 s.cache_clusters.insert(
4591 cluster_id.to_string(),
4592 CacheCluster {
4593 cache_cluster_id: cluster_id.to_string(),
4594 cache_node_type: "cache.t3.micro".to_string(),
4595 engine: "redis".to_string(),
4596 engine_version: "7.1".to_string(),
4597 cache_cluster_status: "available".to_string(),
4598 num_cache_nodes: 1,
4599 preferred_availability_zone: "us-east-1a".to_string(),
4600 cache_subnet_group_name: Some("default".to_string()),
4601 auto_minor_version_upgrade: true,
4602 arn,
4603 created_at: "2024-01-01T00:00:00Z".to_string(),
4604 endpoint_address: "127.0.0.1".to_string(),
4605 endpoint_port: 6379,
4606 container_id: "abc123".to_string(),
4607 host_port: 6379,
4608 replication_group_id: None,
4609 },
4610 );
4611 }
4612 ElastiCacheService::new(shared)
4613 }
4614
4615 #[test]
4616 fn describe_cache_clusters_returns_all() {
4617 let service = service_with_cache_cluster("cluster-a");
4618 {
4619 let mut state = service.state.write();
4620 let arn = "arn:aws:elasticache:us-east-1:123456789012:cluster:cluster-b".to_string();
4621 state.tags.insert(arn.clone(), Vec::new());
4622 state.cache_clusters.insert(
4623 "cluster-b".to_string(),
4624 CacheCluster {
4625 cache_cluster_id: "cluster-b".to_string(),
4626 cache_node_type: "cache.t3.micro".to_string(),
4627 engine: "valkey".to_string(),
4628 engine_version: "8.0".to_string(),
4629 cache_cluster_status: "available".to_string(),
4630 num_cache_nodes: 2,
4631 preferred_availability_zone: "us-east-1b".to_string(),
4632 cache_subnet_group_name: Some("default".to_string()),
4633 auto_minor_version_upgrade: false,
4634 arn,
4635 created_at: "2024-01-02T00:00:00Z".to_string(),
4636 endpoint_address: "127.0.0.1".to_string(),
4637 endpoint_port: 6380,
4638 container_id: "def456".to_string(),
4639 host_port: 6380,
4640 replication_group_id: None,
4641 },
4642 );
4643 }
4644
4645 let req = request("DescribeCacheClusters", &[]);
4646 let resp = service.describe_cache_clusters(&req).unwrap();
4647 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4648 assert!(body.contains("<CacheClusterId>cluster-a</CacheClusterId>"));
4649 assert!(body.contains("<CacheClusterId>cluster-b</CacheClusterId>"));
4650 assert!(body.contains("<DescribeCacheClustersResponse"));
4651 }
4652
4653 #[tokio::test]
4654 async fn create_cache_cluster_validates_engine_before_runtime() {
4655 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4656 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4657 let service = ElastiCacheService::new(shared);
4658
4659 let req = request(
4660 "CreateCacheCluster",
4661 &[("CacheClusterId", "bad-engine"), ("Engine", "memcached")],
4662 );
4663 assert!(service.create_cache_cluster(&req).await.is_err());
4664 }
4665
4666 #[tokio::test]
4667 async fn create_cache_cluster_without_runtime_cancels_reservation() {
4668 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4669 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4670 let service = ElastiCacheService::new(shared.clone());
4671
4672 let req = request("CreateCacheCluster", &[("CacheClusterId", "no-runtime")]);
4673 assert!(service.create_cache_cluster(&req).await.is_err());
4674
4675 let mut state = shared.write();
4676 assert!(state.begin_cache_cluster_creation("no-runtime"));
4677 }
4678
4679 #[test]
4680 fn describe_cache_clusters_filters_by_id_and_shows_node_info() {
4681 let service = service_with_cache_cluster("nodeful-cluster");
4682 let req = request(
4683 "DescribeCacheClusters",
4684 &[
4685 ("CacheClusterId", "nodeful-cluster"),
4686 ("ShowCacheNodeInfo", "true"),
4687 ],
4688 );
4689 let resp = service.describe_cache_clusters(&req).unwrap();
4690 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4691 assert!(body.contains("<CacheClusterId>nodeful-cluster</CacheClusterId>"));
4692 assert!(body.contains("<CacheNodes>"));
4693 assert!(body.contains("<CacheNodeId>0001</CacheNodeId>"));
4694 assert!(body.contains("<ParameterGroupStatus>in-sync</ParameterGroupStatus>"));
4695 }
4696
4697 #[test]
4698 fn describe_cache_clusters_not_found() {
4699 let service = service_with_cache_cluster("cluster-a");
4700 let req = request("DescribeCacheClusters", &[("CacheClusterId", "missing")]);
4701 assert!(service.describe_cache_clusters(&req).is_err());
4702 }
4703
4704 #[tokio::test]
4705 async fn delete_cache_cluster_removes_state_and_tags() {
4706 let service = service_with_cache_cluster("delete-me");
4707 let arn = "arn:aws:elasticache:us-east-1:123456789012:cluster:delete-me".to_string();
4708
4709 let req = request("DeleteCacheCluster", &[("CacheClusterId", "delete-me")]);
4710 let resp = service.delete_cache_cluster(&req).await.unwrap();
4711 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4712 assert!(body.contains("<CacheClusterStatus>deleting</CacheClusterStatus>"));
4713 assert!(body.contains("<DeleteCacheClusterResponse"));
4714 assert!(!service
4715 .state
4716 .read()
4717 .cache_clusters
4718 .contains_key("delete-me"));
4719 assert!(!service.state.read().tags.contains_key(&arn));
4720 }
4721
4722 #[test]
4723 fn add_cluster_to_replication_group_updates_members_and_count() {
4724 let mut state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4725 state.replication_groups.insert(
4726 "rg-1".to_string(),
4727 ReplicationGroup {
4728 replication_group_id: "rg-1".to_string(),
4729 description: "test group".to_string(),
4730 global_replication_group_id: None,
4731 global_replication_group_role: None,
4732 status: "available".to_string(),
4733 cache_node_type: "cache.t3.micro".to_string(),
4734 engine: "redis".to_string(),
4735 engine_version: "7.1".to_string(),
4736 num_cache_clusters: 1,
4737 automatic_failover_enabled: false,
4738 endpoint_address: "127.0.0.1".to_string(),
4739 endpoint_port: 6379,
4740 arn: "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:rg-1".to_string(),
4741 created_at: "2024-01-01T00:00:00Z".to_string(),
4742 container_id: "abc123".to_string(),
4743 host_port: 6379,
4744 member_clusters: vec!["rg-1-001".to_string()],
4745 snapshot_retention_limit: 0,
4746 snapshot_window: "05:00-09:00".to_string(),
4747 },
4748 );
4749
4750 add_cluster_to_replication_group(&mut state, "rg-1", "manual-cluster");
4751
4752 let group = state.replication_groups.get("rg-1").unwrap();
4753 assert_eq!(group.member_clusters, vec!["rg-1-001", "manual-cluster"]);
4754 assert_eq!(group.num_cache_clusters, 2);
4755 }
4756
4757 #[tokio::test]
4758 async fn delete_cache_cluster_removes_cluster_from_replication_group() {
4759 let service = service_with_cache_cluster("delete-rg-cluster");
4760 {
4761 let mut state = service.state.write();
4762 state
4763 .cache_clusters
4764 .get_mut("delete-rg-cluster")
4765 .unwrap()
4766 .replication_group_id = Some("delete-rg".to_string());
4767 state.replication_groups.insert(
4768 "delete-rg".to_string(),
4769 ReplicationGroup {
4770 replication_group_id: "delete-rg".to_string(),
4771 description: "test group".to_string(),
4772 global_replication_group_id: None,
4773 global_replication_group_role: None,
4774 status: "available".to_string(),
4775 cache_node_type: "cache.t3.micro".to_string(),
4776 engine: "redis".to_string(),
4777 engine_version: "7.1".to_string(),
4778 num_cache_clusters: 2,
4779 automatic_failover_enabled: false,
4780 endpoint_address: "127.0.0.1".to_string(),
4781 endpoint_port: 6379,
4782 arn: "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:delete-rg"
4783 .to_string(),
4784 created_at: "2024-01-01T00:00:00Z".to_string(),
4785 container_id: "abc123".to_string(),
4786 host_port: 6379,
4787 member_clusters: vec![
4788 "delete-rg-001".to_string(),
4789 "delete-rg-cluster".to_string(),
4790 ],
4791 snapshot_retention_limit: 0,
4792 snapshot_window: "05:00-09:00".to_string(),
4793 },
4794 );
4795 }
4796
4797 let req = request(
4798 "DeleteCacheCluster",
4799 &[("CacheClusterId", "delete-rg-cluster")],
4800 );
4801 service.delete_cache_cluster(&req).await.unwrap();
4802
4803 let group = service
4804 .state
4805 .read()
4806 .replication_groups
4807 .get("delete-rg")
4808 .unwrap()
4809 .clone();
4810 assert_eq!(group.member_clusters, vec!["delete-rg-001"]);
4811 assert_eq!(group.num_cache_clusters, 1);
4812 }
4813
4814 #[test]
4815 fn create_snapshot_rejects_standalone_cache_cluster_id() {
4816 let service = service_with_cache_cluster("standalone");
4817 let req = request(
4818 "CreateSnapshot",
4819 &[
4820 ("SnapshotName", "standalone-snap"),
4821 ("CacheClusterId", "standalone"),
4822 ],
4823 );
4824 assert!(service.create_snapshot(&req).is_err());
4825 }
4826
4827 fn service_with_replication_group(group_id: &str, num_clusters: i32) -> ElastiCacheService {
4828 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4829 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4830 {
4831 let mut s = shared.write();
4832 let member_clusters: Vec<String> = (1..=num_clusters)
4833 .map(|i| format!("{group_id}-{i:03}"))
4834 .collect();
4835 let arn =
4836 format!("arn:aws:elasticache:us-east-1:123456789012:replicationgroup:{group_id}");
4837 s.tags.insert(arn.clone(), Vec::new());
4838 s.replication_groups.insert(
4839 group_id.to_string(),
4840 ReplicationGroup {
4841 replication_group_id: group_id.to_string(),
4842 description: "test group".to_string(),
4843 global_replication_group_id: None,
4844 global_replication_group_role: None,
4845 status: "available".to_string(),
4846 cache_node_type: "cache.t3.micro".to_string(),
4847 engine: "redis".to_string(),
4848 engine_version: "7.1".to_string(),
4849 num_cache_clusters: num_clusters,
4850 automatic_failover_enabled: false,
4851 endpoint_address: "127.0.0.1".to_string(),
4852 endpoint_port: 6379,
4853 arn,
4854 created_at: "2024-01-01T00:00:00Z".to_string(),
4855 container_id: "abc123".to_string(),
4856 host_port: 6379,
4857 member_clusters,
4858 snapshot_retention_limit: 0,
4859 snapshot_window: "05:00-09:00".to_string(),
4860 },
4861 );
4862 }
4863 ElastiCacheService::new(shared)
4864 }
4865
4866 fn service_with_serverless_cache(cache_name: &str) -> ElastiCacheService {
4867 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
4868 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
4869 {
4870 let mut s = shared.write();
4871 let arn =
4872 format!("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:{cache_name}");
4873 s.tags.insert(arn.clone(), Vec::new());
4874 s.serverless_caches.insert(
4875 cache_name.to_string(),
4876 ServerlessCache {
4877 serverless_cache_name: cache_name.to_string(),
4878 description: "serverless cache".to_string(),
4879 engine: "redis".to_string(),
4880 major_engine_version: "7.1".to_string(),
4881 full_engine_version: "7.1".to_string(),
4882 status: "available".to_string(),
4883 endpoint: ServerlessCacheEndpoint {
4884 address: "127.0.0.1".to_string(),
4885 port: 6379,
4886 },
4887 reader_endpoint: ServerlessCacheEndpoint {
4888 address: "127.0.0.1".to_string(),
4889 port: 6379,
4890 },
4891 arn,
4892 created_at: "2024-01-01T00:00:00Z".to_string(),
4893 cache_usage_limits: Some(ServerlessCacheUsageLimits {
4894 data_storage: Some(ServerlessCacheDataStorage {
4895 maximum: Some(10),
4896 minimum: Some(1),
4897 unit: Some("GB".to_string()),
4898 }),
4899 ecpu_per_second: Some(ServerlessCacheEcpuPerSecond {
4900 maximum: Some(5000),
4901 minimum: Some(1000),
4902 }),
4903 }),
4904 security_group_ids: vec!["sg-123".to_string()],
4905 subnet_ids: vec!["subnet-123".to_string()],
4906 kms_key_id: Some("kms-123".to_string()),
4907 user_group_id: None,
4908 snapshot_retention_limit: Some(1),
4909 daily_snapshot_time: Some("03:00".to_string()),
4910 container_id: "cid".to_string(),
4911 host_port: 6379,
4912 },
4913 );
4914 }
4915 ElastiCacheService::new(shared)
4916 }
4917
4918 fn service_with_global_replication_group(
4919 global_replication_group_id: &str,
4920 replication_group_id: &str,
4921 ) -> ElastiCacheService {
4922 let service = service_with_replication_group(replication_group_id, 1);
4923 {
4924 let mut state = service.state.write();
4925 state
4926 .replication_groups
4927 .get_mut(replication_group_id)
4928 .unwrap()
4929 .global_replication_group_id = Some(global_replication_group_id.to_string());
4930 state
4931 .replication_groups
4932 .get_mut(replication_group_id)
4933 .unwrap()
4934 .global_replication_group_role = Some("primary".to_string());
4935 state.global_replication_groups.insert(
4936 global_replication_group_id.to_string(),
4937 GlobalReplicationGroup {
4938 global_replication_group_id: global_replication_group_id.to_string(),
4939 global_replication_group_description: "global test group".to_string(),
4940 status: "available".to_string(),
4941 cache_node_type: "cache.t3.micro".to_string(),
4942 engine: "redis".to_string(),
4943 engine_version: "7.1".to_string(),
4944 members: vec![GlobalReplicationGroupMember {
4945 replication_group_id: replication_group_id.to_string(),
4946 replication_group_region: "us-east-1".to_string(),
4947 role: "primary".to_string(),
4948 automatic_failover: false,
4949 status: "associated".to_string(),
4950 }],
4951 cluster_enabled: false,
4952 arn: format!(
4953 "arn:aws:elasticache:us-east-1:123456789012:globalreplicationgroup:{global_replication_group_id}"
4954 ),
4955 },
4956 );
4957 }
4958 service
4959 }
4960
4961 #[test]
4962 fn create_global_replication_group_registers_metadata_and_updates_primary_group() {
4963 let service = service_with_replication_group("primary-rg", 1);
4964 let req = request(
4965 "CreateGlobalReplicationGroup",
4966 &[
4967 ("GlobalReplicationGroupIdSuffix", "global-a"),
4968 ("PrimaryReplicationGroupId", "primary-rg"),
4969 ("GlobalReplicationGroupDescription", "global slice"),
4970 ],
4971 );
4972
4973 let resp = service.create_global_replication_group(&req).unwrap();
4974 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
4975 assert!(body.contains(
4976 "<GlobalReplicationGroupDescription>global slice</GlobalReplicationGroupDescription>"
4977 ));
4978 assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
4979 assert!(body.contains("<Role>primary</Role>"));
4980
4981 let state = service.state.read();
4982 let primary_group = state.replication_groups.get("primary-rg").unwrap();
4983 assert_eq!(
4984 primary_group.global_replication_group_id.as_deref(),
4985 Some("fc-us-east-1-global-a")
4986 );
4987 assert_eq!(
4988 primary_group.global_replication_group_role.as_deref(),
4989 Some("primary")
4990 );
4991 assert!(state
4992 .global_replication_groups
4993 .contains_key("fc-us-east-1-global-a"));
4994 }
4995
4996 #[test]
4997 fn describe_global_replication_groups_filters_by_id() {
4998 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
4999 let req = request(
5000 "DescribeGlobalReplicationGroups",
5001 &[
5002 ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5003 ("ShowMemberInfo", "true"),
5004 ],
5005 );
5006
5007 let resp = service.describe_global_replication_groups(&req).unwrap();
5008 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5009 assert!(body.contains(
5010 "<GlobalReplicationGroupId>fc-us-east-1-global-a</GlobalReplicationGroupId>"
5011 ));
5012 assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5013 assert!(body.contains("<DescribeGlobalReplicationGroupsResponse"));
5014 }
5015
5016 #[test]
5017 fn modify_global_replication_group_updates_primary_replication_group_state() {
5018 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5019 let req = request(
5020 "ModifyGlobalReplicationGroup",
5021 &[
5022 ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5023 ("ApplyImmediately", "true"),
5024 ("GlobalReplicationGroupDescription", "updated"),
5025 ("CacheNodeType", "cache.m5.large"),
5026 ("EngineVersion", "7.2"),
5027 ("AutomaticFailoverEnabled", "true"),
5028 ],
5029 );
5030
5031 let resp = service.modify_global_replication_group(&req).unwrap();
5032 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5033 assert!(body.contains(
5034 "<GlobalReplicationGroupDescription>updated</GlobalReplicationGroupDescription>"
5035 ));
5036 assert!(body.contains("<CacheNodeType>cache.m5.large</CacheNodeType>"));
5037 assert!(body.contains("<EngineVersion>7.2</EngineVersion>"));
5038
5039 let state = service.state.read();
5040 let primary_group = state.replication_groups.get("primary-rg").unwrap();
5041 assert_eq!(primary_group.cache_node_type, "cache.m5.large");
5042 assert_eq!(primary_group.engine_version, "7.2");
5043 assert!(primary_group.automatic_failover_enabled);
5044 }
5045
5046 #[test]
5047 fn delete_global_replication_group_clears_primary_group_association() {
5048 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5049 let req = request(
5050 "DeleteGlobalReplicationGroup",
5051 &[
5052 ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5053 ("RetainPrimaryReplicationGroup", "true"),
5054 ],
5055 );
5056
5057 let resp = service.delete_global_replication_group(&req).unwrap();
5058 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5059 assert!(body.contains("<Status>deleting</Status>"));
5060
5061 let state = service.state.read();
5062 assert!(!state
5063 .global_replication_groups
5064 .contains_key("fc-us-east-1-global-a"));
5065 let primary_group = state.replication_groups.get("primary-rg").unwrap();
5066 assert!(primary_group.global_replication_group_id.is_none());
5067 assert!(primary_group.global_replication_group_role.is_none());
5068 }
5069
5070 #[test]
5071 fn replication_group_xml_includes_global_replication_group_info() {
5072 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5073 let req = request(
5074 "DescribeReplicationGroups",
5075 &[("ReplicationGroupId", "primary-rg")],
5076 );
5077
5078 let resp = service.describe_replication_groups(&req).unwrap();
5079 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5080 assert!(body.contains("<GlobalReplicationGroupInfo>"));
5081 assert!(body.contains(
5082 "<GlobalReplicationGroupId>fc-us-east-1-global-a</GlobalReplicationGroupId>"
5083 ));
5084 assert!(body.contains(
5085 "<GlobalReplicationGroupMemberRole>primary</GlobalReplicationGroupMemberRole>"
5086 ));
5087 }
5088
5089 #[test]
5090 fn failover_global_replication_group_returns_current_primary() {
5091 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5092 let req = request(
5093 "FailoverGlobalReplicationGroup",
5094 &[
5095 ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5096 ("PrimaryRegion", "us-east-1"),
5097 ("PrimaryReplicationGroupId", "primary-rg"),
5098 ],
5099 );
5100
5101 let resp = service.failover_global_replication_group(&req).unwrap();
5102 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5103 assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5104 assert!(body.contains("<FailoverGlobalReplicationGroupResponse"));
5105 }
5106
5107 #[test]
5108 fn disassociate_global_replication_group_accepts_current_primary_as_noop() {
5109 let service = service_with_global_replication_group("fc-us-east-1-global-a", "primary-rg");
5110 let req = request(
5111 "DisassociateGlobalReplicationGroup",
5112 &[
5113 ("GlobalReplicationGroupId", "fc-us-east-1-global-a"),
5114 ("ReplicationGroupId", "primary-rg"),
5115 ("ReplicationGroupRegion", "us-east-1"),
5116 ],
5117 );
5118
5119 let resp = service.disassociate_global_replication_group(&req).unwrap();
5120 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5121 assert!(body.contains("<ReplicationGroupId>primary-rg</ReplicationGroupId>"));
5122 assert!(body.contains("<DisassociateGlobalReplicationGroupResponse"));
5123 }
5124
5125 #[test]
5126 fn modify_replication_group_updates_description() {
5127 let service = service_with_replication_group("my-rg", 1);
5128 let req = request(
5129 "ModifyReplicationGroup",
5130 &[
5131 ("ReplicationGroupId", "my-rg"),
5132 ("ReplicationGroupDescription", "Updated description"),
5133 ],
5134 );
5135 let resp = service.modify_replication_group(&req).unwrap();
5136 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5137 assert!(body.contains("<Description>Updated description</Description>"));
5138 assert!(body.contains("<ModifyReplicationGroupResponse"));
5139 }
5140
5141 #[test]
5142 fn modify_replication_group_updates_multiple_fields() {
5143 let service = service_with_replication_group("my-rg", 1);
5144 let req = request(
5145 "ModifyReplicationGroup",
5146 &[
5147 ("ReplicationGroupId", "my-rg"),
5148 ("CacheNodeType", "cache.m5.large"),
5149 ("AutomaticFailoverEnabled", "true"),
5150 ("SnapshotRetentionLimit", "5"),
5151 ("SnapshotWindow", "02:00-06:00"),
5152 ],
5153 );
5154 let resp = service.modify_replication_group(&req).unwrap();
5155 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5156 assert!(body.contains("<CacheNodeType>cache.m5.large</CacheNodeType>"));
5157 assert!(body.contains("<AutomaticFailover>enabled</AutomaticFailover>"));
5158 assert!(body.contains("<SnapshotRetentionLimit>5</SnapshotRetentionLimit>"));
5159 assert!(body.contains("<SnapshotWindow>02:00-06:00</SnapshotWindow>"));
5160 }
5161
5162 #[test]
5163 fn modify_replication_group_not_found() {
5164 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5165 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5166 let service = ElastiCacheService::new(shared);
5167 let req = request(
5168 "ModifyReplicationGroup",
5169 &[("ReplicationGroupId", "nonexistent")],
5170 );
5171 assert!(service.modify_replication_group(&req).is_err());
5172 }
5173
5174 #[test]
5175 fn parse_cache_usage_limits_reads_nested_query_shape() {
5176 let req = request(
5177 "CreateServerlessCache",
5178 &[
5179 ("CacheUsageLimits.DataStorage.Maximum", "10"),
5180 ("CacheUsageLimits.DataStorage.Minimum", "2"),
5181 ("CacheUsageLimits.DataStorage.Unit", "GB"),
5182 ("CacheUsageLimits.ECPUPerSecond.Maximum", "5000"),
5183 ("CacheUsageLimits.ECPUPerSecond.Minimum", "1000"),
5184 ],
5185 );
5186
5187 let limits = parse_cache_usage_limits(&req).unwrap().unwrap();
5188 let data_storage = limits.data_storage.unwrap();
5189 assert_eq!(data_storage.maximum, Some(10));
5190 assert_eq!(data_storage.minimum, Some(2));
5191 assert_eq!(data_storage.unit.as_deref(), Some("GB"));
5192
5193 let ecpu = limits.ecpu_per_second.unwrap();
5194 assert_eq!(ecpu.maximum, Some(5000));
5195 assert_eq!(ecpu.minimum, Some(1000));
5196 }
5197
5198 #[test]
5199 fn serverless_cache_xml_contains_expected_fields() {
5200 let cache = service_with_serverless_cache("cache-a")
5201 .state
5202 .read()
5203 .serverless_caches["cache-a"]
5204 .clone();
5205
5206 let xml = serverless_cache_xml(&cache);
5207 assert!(xml.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5208 assert!(xml.contains("<Engine>redis</Engine>"));
5209 assert!(xml.contains("<MajorEngineVersion>7.1</MajorEngineVersion>"));
5210 assert!(xml.contains("<Endpoint><Address>127.0.0.1</Address><Port>6379</Port></Endpoint>"));
5211 assert!(xml.contains(
5212 "<ReaderEndpoint><Address>127.0.0.1</Address><Port>6379</Port></ReaderEndpoint>"
5213 ));
5214 assert!(xml.contains(
5215 "<SecurityGroupIds><SecurityGroupId>sg-123</SecurityGroupId></SecurityGroupIds>"
5216 ));
5217 assert!(xml.contains("<SubnetIds><member>subnet-123</member></SubnetIds>"));
5218 assert!(xml.contains("<CacheUsageLimits>"));
5219 }
5220
5221 #[test]
5222 fn serverless_cache_snapshot_xml_contains_expected_fields() {
5223 let snapshot = ServerlessCacheSnapshot {
5224 serverless_cache_snapshot_name: "snap-a".to_string(),
5225 arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a".to_string(),
5226 kms_key_id: Some("kms-123".to_string()),
5227 snapshot_type: "manual".to_string(),
5228 status: "available".to_string(),
5229 create_time: "2024-01-01T00:00:00Z".to_string(),
5230 expiry_time: None,
5231 bytes_used_for_cache: Some("0".to_string()),
5232 serverless_cache_name: "cache-a".to_string(),
5233 engine: "redis".to_string(),
5234 major_engine_version: "7.1".to_string(),
5235 };
5236
5237 let xml = serverless_cache_snapshot_xml(&snapshot);
5238 assert!(xml.contains("<ServerlessCacheSnapshotName>snap-a</ServerlessCacheSnapshotName>"));
5239 assert!(xml.contains("<KmsKeyId>kms-123</KmsKeyId>"));
5240 assert!(xml.contains("<SnapshotType>manual</SnapshotType>"));
5241 assert!(xml.contains("<ServerlessCacheConfiguration>"));
5242 assert!(xml.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5243 }
5244
5245 #[test]
5246 fn describe_serverless_caches_returns_all() {
5247 let service = service_with_serverless_cache("cache-a");
5248 {
5249 let mut state = service.state.write();
5250 state.serverless_caches.insert(
5251 "cache-b".to_string(),
5252 ServerlessCache {
5253 serverless_cache_name: "cache-b".to_string(),
5254 description: "serverless cache".to_string(),
5255 engine: "valkey".to_string(),
5256 major_engine_version: "8.0".to_string(),
5257 full_engine_version: "8.0".to_string(),
5258 status: "available".to_string(),
5259 endpoint: ServerlessCacheEndpoint {
5260 address: "127.0.0.1".to_string(),
5261 port: 6380,
5262 },
5263 reader_endpoint: ServerlessCacheEndpoint {
5264 address: "127.0.0.1".to_string(),
5265 port: 6380,
5266 },
5267 arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-b"
5268 .to_string(),
5269 created_at: "2024-01-02T00:00:00Z".to_string(),
5270 cache_usage_limits: None,
5271 security_group_ids: Vec::new(),
5272 subnet_ids: Vec::new(),
5273 kms_key_id: None,
5274 user_group_id: None,
5275 snapshot_retention_limit: None,
5276 daily_snapshot_time: None,
5277 container_id: "cid".to_string(),
5278 host_port: 6380,
5279 },
5280 );
5281 }
5282
5283 let resp = service
5284 .describe_serverless_caches(&request("DescribeServerlessCaches", &[]))
5285 .unwrap();
5286 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5287 assert!(body.contains("<ServerlessCacheName>cache-a</ServerlessCacheName>"));
5288 assert!(body.contains("<ServerlessCacheName>cache-b</ServerlessCacheName>"));
5289 }
5290
5291 #[test]
5292 fn modify_serverless_cache_updates_fields() {
5293 let service = service_with_serverless_cache("cache-a");
5294 let req = request(
5295 "ModifyServerlessCache",
5296 &[
5297 ("ServerlessCacheName", "cache-a"),
5298 ("Description", "updated"),
5299 ("SecurityGroupIds.SecurityGroupId.1", "sg-999"),
5300 ("SnapshotRetentionLimit", "7"),
5301 ("DailySnapshotTime", "05:00"),
5302 ],
5303 );
5304
5305 let resp = service.modify_serverless_cache(&req).unwrap();
5306 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5307 assert!(body.contains("<Description>updated</Description>"));
5308 assert!(body.contains(
5309 "<SecurityGroupIds><SecurityGroupId>sg-999</SecurityGroupId></SecurityGroupIds>"
5310 ));
5311 assert!(body.contains("<SnapshotRetentionLimit>7</SnapshotRetentionLimit>"));
5312 assert!(body.contains("<DailySnapshotTime>05:00</DailySnapshotTime>"));
5313 }
5314
5315 #[test]
5316 fn parse_query_list_param_reads_indexed_and_flat_query_values() {
5317 let req = AwsRequest {
5318 service: "elasticache".to_string(),
5319 action: "ModifyServerlessCache".to_string(),
5320 region: "us-east-1".to_string(),
5321 account_id: "000000000000".to_string(),
5322 request_id: "req-1".to_string(),
5323 headers: HeaderMap::new(),
5324 query_params: HashMap::from([
5325 ("SecurityGroupIds.member.1".to_string(), "sg-a".to_string()),
5326 ("SecurityGroupIds.member.2".to_string(), "sg-b".to_string()),
5327 ]),
5328 body: Bytes::new(),
5329 path_segments: vec![],
5330 raw_path: "/".to_string(),
5331 raw_query: String::new(),
5332 method: Method::POST,
5333 is_query_protocol: true,
5334 access_key_id: None,
5335 principal: None,
5336 };
5337 assert_eq!(
5338 parse_query_list_param(&req, "SecurityGroupIds", "SecurityGroupId"),
5339 vec!["sg-a".to_string(), "sg-b".to_string()]
5340 );
5341
5342 let req = AwsRequest {
5343 query_params: HashMap::from([("SecurityGroupIds".to_string(), "sg-flat".to_string())]),
5344 ..req
5345 };
5346 assert_eq!(
5347 parse_query_list_param(&req, "SecurityGroupIds", "SecurityGroupId"),
5348 vec!["sg-flat".to_string()]
5349 );
5350 }
5351
5352 #[test]
5353 fn describe_serverless_cache_snapshots_filters_by_cache_name() {
5354 let service = service_with_serverless_cache("cache-a");
5355 {
5356 let mut state = service.state.write();
5357 state.serverless_cache_snapshots.insert(
5358 "snap-a".to_string(),
5359 ServerlessCacheSnapshot {
5360 serverless_cache_snapshot_name: "snap-a".to_string(),
5361 arn: "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a"
5362 .to_string(),
5363 kms_key_id: None,
5364 snapshot_type: "manual".to_string(),
5365 status: "available".to_string(),
5366 create_time: "2024-01-01T00:00:00Z".to_string(),
5367 expiry_time: None,
5368 bytes_used_for_cache: None,
5369 serverless_cache_name: "cache-a".to_string(),
5370 engine: "redis".to_string(),
5371 major_engine_version: "7.1".to_string(),
5372 },
5373 );
5374 }
5375
5376 let resp = service
5377 .describe_serverless_cache_snapshots(&request(
5378 "DescribeServerlessCacheSnapshots",
5379 &[("ServerlessCacheName", "cache-a")],
5380 ))
5381 .unwrap();
5382 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5383 assert!(body.contains("<ServerlessCacheSnapshotName>snap-a</ServerlessCacheSnapshotName>"));
5384 }
5385
5386 #[test]
5387 fn delete_serverless_cache_snapshot_removes_tags() {
5388 let service = service_with_serverless_cache("cache-a");
5389 {
5390 let mut state = service.state.write();
5391 let arn =
5392 "arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a".to_string();
5393 state.tags.insert(arn.clone(), Vec::new());
5394 state.serverless_cache_snapshots.insert(
5395 "snap-a".to_string(),
5396 ServerlessCacheSnapshot {
5397 serverless_cache_snapshot_name: "snap-a".to_string(),
5398 arn,
5399 kms_key_id: None,
5400 snapshot_type: "manual".to_string(),
5401 status: "available".to_string(),
5402 create_time: "2024-01-01T00:00:00Z".to_string(),
5403 expiry_time: None,
5404 bytes_used_for_cache: None,
5405 serverless_cache_name: "cache-a".to_string(),
5406 engine: "redis".to_string(),
5407 major_engine_version: "7.1".to_string(),
5408 },
5409 );
5410 }
5411
5412 let resp = service
5413 .delete_serverless_cache_snapshot(&request(
5414 "DeleteServerlessCacheSnapshot",
5415 &[("ServerlessCacheSnapshotName", "snap-a")],
5416 ))
5417 .unwrap();
5418 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5419 assert!(body.contains("<Status>deleting</Status>"));
5420 assert!(!service
5421 .state
5422 .read()
5423 .tags
5424 .contains_key("arn:aws:elasticache:us-east-1:123456789012:serverlesssnapshot:snap-a"));
5425 }
5426
5427 #[test]
5428 fn increase_replica_count_updates_member_clusters() {
5429 let service = service_with_replication_group("my-rg", 1);
5430 let req = request(
5431 "IncreaseReplicaCount",
5432 &[
5433 ("ReplicationGroupId", "my-rg"),
5434 ("ApplyImmediately", "true"),
5435 ("NewReplicaCount", "2"),
5436 ],
5437 );
5438 let resp = service.increase_replica_count(&req).unwrap();
5439 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5440 assert!(body.contains("<ClusterId>my-rg-001</ClusterId>"));
5441 assert!(body.contains("<ClusterId>my-rg-002</ClusterId>"));
5442 assert!(body.contains("<ClusterId>my-rg-003</ClusterId>"));
5443 assert!(body.contains("<IncreaseReplicaCountResponse"));
5444 }
5445
5446 #[test]
5447 fn increase_replica_count_rejects_same_or_lower() {
5448 let service = service_with_replication_group("my-rg", 3);
5449 let req = request(
5450 "IncreaseReplicaCount",
5451 &[
5452 ("ReplicationGroupId", "my-rg"),
5453 ("ApplyImmediately", "true"),
5454 ("NewReplicaCount", "2"),
5455 ],
5456 );
5457 assert!(service.increase_replica_count(&req).is_err());
5458 }
5459
5460 #[test]
5461 fn increase_replica_count_not_found() {
5462 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5463 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5464 let service = ElastiCacheService::new(shared);
5465 let req = request(
5466 "IncreaseReplicaCount",
5467 &[
5468 ("ReplicationGroupId", "nonexistent"),
5469 ("ApplyImmediately", "true"),
5470 ("NewReplicaCount", "2"),
5471 ],
5472 );
5473 assert!(service.increase_replica_count(&req).is_err());
5474 }
5475
5476 #[test]
5477 fn decrease_replica_count_updates_member_clusters() {
5478 let service = service_with_replication_group("my-rg", 3);
5479 let req = request(
5480 "DecreaseReplicaCount",
5481 &[
5482 ("ReplicationGroupId", "my-rg"),
5483 ("ApplyImmediately", "true"),
5484 ("NewReplicaCount", "1"),
5485 ],
5486 );
5487 let resp = service.decrease_replica_count(&req).unwrap();
5488 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5489 assert!(body.contains("<ClusterId>my-rg-001</ClusterId>"));
5490 assert!(body.contains("<ClusterId>my-rg-002</ClusterId>"));
5491 assert!(!body.contains("<ClusterId>my-rg-003</ClusterId>"));
5492 assert!(body.contains("<DecreaseReplicaCountResponse"));
5493 }
5494
5495 #[test]
5496 fn decrease_replica_count_validates_minimum() {
5497 let service = service_with_replication_group("my-rg", 1);
5498 let req = request(
5500 "DecreaseReplicaCount",
5501 &[
5502 ("ReplicationGroupId", "my-rg"),
5503 ("ApplyImmediately", "true"),
5504 ("NewReplicaCount", "0"),
5505 ],
5506 );
5507 assert!(service.decrease_replica_count(&req).is_err());
5508 }
5509
5510 #[test]
5511 fn decrease_replica_count_rejects_negative() {
5512 let service = service_with_replication_group("my-rg", 2);
5513 let req = request(
5514 "DecreaseReplicaCount",
5515 &[
5516 ("ReplicationGroupId", "my-rg"),
5517 ("ApplyImmediately", "true"),
5518 ("NewReplicaCount", "-1"),
5519 ],
5520 );
5521 assert!(service.decrease_replica_count(&req).is_err());
5522 }
5523
5524 #[test]
5525 fn test_failover_validates_node_group() {
5526 let service = service_with_replication_group("my-rg", 1);
5527 let req = request(
5528 "TestFailover",
5529 &[("ReplicationGroupId", "my-rg"), ("NodeGroupId", "0001")],
5530 );
5531 let resp = service.test_failover(&req).unwrap();
5532 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5533 assert!(body.contains("<Status>available</Status>"));
5534 assert!(body.contains("<TestFailoverResponse"));
5535 }
5536
5537 #[test]
5538 fn test_failover_rejects_invalid_node_group() {
5539 let service = service_with_replication_group("my-rg", 1);
5540 let req = request(
5541 "TestFailover",
5542 &[("ReplicationGroupId", "my-rg"), ("NodeGroupId", "9999")],
5543 );
5544 assert!(service.test_failover(&req).is_err());
5545 }
5546
5547 #[test]
5548 fn test_failover_not_found() {
5549 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5550 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5551 let service = ElastiCacheService::new(shared);
5552 let req = request(
5553 "TestFailover",
5554 &[
5555 ("ReplicationGroupId", "nonexistent"),
5556 ("NodeGroupId", "0001"),
5557 ],
5558 );
5559 assert!(service.test_failover(&req).is_err());
5560 }
5561
5562 #[test]
5565 fn create_snapshot_returns_snapshot_xml() {
5566 let service = service_with_replication_group("snap-rg", 1);
5567 let req = request(
5568 "CreateSnapshot",
5569 &[
5570 ("SnapshotName", "my-snap"),
5571 ("ReplicationGroupId", "snap-rg"),
5572 ],
5573 );
5574 let resp = service.create_snapshot(&req).unwrap();
5575 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5576 assert!(body.contains("<SnapshotName>my-snap</SnapshotName>"));
5577 assert!(body.contains("<ReplicationGroupId>snap-rg</ReplicationGroupId>"));
5578 assert!(body.contains("<SnapshotStatus>available</SnapshotStatus>"));
5579 assert!(body.contains("<SnapshotSource>manual</SnapshotSource>"));
5580 assert!(body.contains("<Engine>redis</Engine>"));
5581 assert!(body.contains("<CreateSnapshotResponse"));
5582 }
5583
5584 #[test]
5585 fn create_snapshot_via_cache_cluster_id() {
5586 let service = service_with_replication_group("cc-rg", 2);
5587 let req = request(
5588 "CreateSnapshot",
5589 &[
5590 ("SnapshotName", "cluster-snap"),
5591 ("CacheClusterId", "cc-rg-001"),
5592 ],
5593 );
5594 let resp = service.create_snapshot(&req).unwrap();
5595 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5596 assert!(body.contains("<ReplicationGroupId>cc-rg</ReplicationGroupId>"));
5597 }
5598
5599 #[test]
5600 fn create_snapshot_rejects_missing_group_and_cluster() {
5601 let service = service_with_replication_group("rg", 1);
5602 let req = request("CreateSnapshot", &[("SnapshotName", "bad-snap")]);
5603 assert!(service.create_snapshot(&req).is_err());
5604 }
5605
5606 #[test]
5607 fn create_snapshot_rejects_duplicate_name() {
5608 let service = service_with_replication_group("dup-rg", 1);
5609 let req = request(
5610 "CreateSnapshot",
5611 &[
5612 ("SnapshotName", "dup-snap"),
5613 ("ReplicationGroupId", "dup-rg"),
5614 ],
5615 );
5616 service.create_snapshot(&req).unwrap();
5617 assert!(service.create_snapshot(&req).is_err());
5618 }
5619
5620 #[test]
5621 fn create_snapshot_rejects_nonexistent_group() {
5622 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5623 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5624 let service = ElastiCacheService::new(shared);
5625 let req = request(
5626 "CreateSnapshot",
5627 &[
5628 ("SnapshotName", "orphan"),
5629 ("ReplicationGroupId", "no-such-rg"),
5630 ],
5631 );
5632 assert!(service.create_snapshot(&req).is_err());
5633 }
5634
5635 #[test]
5636 fn create_snapshot_rejects_missing_name() {
5637 let service = service_with_replication_group("rg", 1);
5638 let req = request("CreateSnapshot", &[("ReplicationGroupId", "rg")]);
5639 assert!(service.create_snapshot(&req).is_err());
5640 }
5641
5642 #[test]
5643 fn create_snapshot_registers_arn_for_tags() {
5644 let service = service_with_replication_group("tag-rg", 1);
5645 let req = request(
5646 "CreateSnapshot",
5647 &[
5648 ("SnapshotName", "tag-snap"),
5649 ("ReplicationGroupId", "tag-rg"),
5650 ],
5651 );
5652 service.create_snapshot(&req).unwrap();
5653
5654 let state = service.state.read();
5655 let arn = "arn:aws:elasticache:us-east-1:123456789012:snapshot:tag-snap".to_string();
5656 assert!(state.tags.contains_key(&arn));
5657 }
5658
5659 #[test]
5660 fn describe_snapshots_returns_all() {
5661 let service = service_with_replication_group("desc-rg", 1);
5662 for name in &["snap-a", "snap-b"] {
5663 let req = request(
5664 "CreateSnapshot",
5665 &[("SnapshotName", name), ("ReplicationGroupId", "desc-rg")],
5666 );
5667 service.create_snapshot(&req).unwrap();
5668 }
5669 let req = request("DescribeSnapshots", &[]);
5670 let resp = service.describe_snapshots(&req).unwrap();
5671 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5672 assert!(body.contains("<SnapshotName>snap-a</SnapshotName>"));
5673 assert!(body.contains("<SnapshotName>snap-b</SnapshotName>"));
5674 assert!(body.contains("<DescribeSnapshotsResponse"));
5675 }
5676
5677 #[test]
5678 fn describe_snapshots_filters_by_name() {
5679 let service = service_with_replication_group("filt-rg", 1);
5680 for name in &["snap-1", "snap-2"] {
5681 let req = request(
5682 "CreateSnapshot",
5683 &[("SnapshotName", name), ("ReplicationGroupId", "filt-rg")],
5684 );
5685 service.create_snapshot(&req).unwrap();
5686 }
5687 let req = request("DescribeSnapshots", &[("SnapshotName", "snap-1")]);
5688 let resp = service.describe_snapshots(&req).unwrap();
5689 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5690 assert!(body.contains("<SnapshotName>snap-1</SnapshotName>"));
5691 assert!(!body.contains("<SnapshotName>snap-2</SnapshotName>"));
5692 }
5693
5694 #[test]
5695 fn describe_snapshots_filters_by_replication_group() {
5696 let service = service_with_replication_group("rg-a", 1);
5697 let req = request(
5698 "CreateSnapshot",
5699 &[
5700 ("SnapshotName", "rg-a-snap"),
5701 ("ReplicationGroupId", "rg-a"),
5702 ],
5703 );
5704 service.create_snapshot(&req).unwrap();
5705
5706 let req = request("DescribeSnapshots", &[("ReplicationGroupId", "rg-a")]);
5707 let resp = service.describe_snapshots(&req).unwrap();
5708 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5709 assert!(body.contains("<SnapshotName>rg-a-snap</SnapshotName>"));
5710
5711 let req = request("DescribeSnapshots", &[("ReplicationGroupId", "rg-b")]);
5713 let resp = service.describe_snapshots(&req).unwrap();
5714 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5715 assert!(!body.contains("<SnapshotName>"));
5716 }
5717
5718 #[test]
5719 fn describe_snapshots_not_found_by_name() {
5720 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5721 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5722 let service = ElastiCacheService::new(shared);
5723 let req = request("DescribeSnapshots", &[("SnapshotName", "nope")]);
5724 assert!(service.describe_snapshots(&req).is_err());
5725 }
5726
5727 #[test]
5728 fn delete_snapshot_removes_and_returns_deleting() {
5729 let service = service_with_replication_group("del-rg", 1);
5730 let req = request(
5731 "CreateSnapshot",
5732 &[
5733 ("SnapshotName", "del-snap"),
5734 ("ReplicationGroupId", "del-rg"),
5735 ],
5736 );
5737 service.create_snapshot(&req).unwrap();
5738
5739 let req = request("DeleteSnapshot", &[("SnapshotName", "del-snap")]);
5740 let resp = service.delete_snapshot(&req).unwrap();
5741 let body = String::from_utf8(resp.body.expect_bytes().to_vec()).unwrap();
5742 assert!(body.contains("<SnapshotStatus>deleting</SnapshotStatus>"));
5743 assert!(body.contains("<DeleteSnapshotResponse"));
5744
5745 assert!(!service.state.read().snapshots.contains_key("del-snap"));
5747 }
5748
5749 #[test]
5750 fn delete_snapshot_cleans_up_tags() {
5751 let service = service_with_replication_group("tag-del-rg", 1);
5752 let req = request(
5753 "CreateSnapshot",
5754 &[
5755 ("SnapshotName", "tag-del-snap"),
5756 ("ReplicationGroupId", "tag-del-rg"),
5757 ],
5758 );
5759 service.create_snapshot(&req).unwrap();
5760
5761 let arn = "arn:aws:elasticache:us-east-1:123456789012:snapshot:tag-del-snap".to_string();
5762 assert!(service.state.read().tags.contains_key(&arn));
5763
5764 let req = request("DeleteSnapshot", &[("SnapshotName", "tag-del-snap")]);
5765 service.delete_snapshot(&req).unwrap();
5766 assert!(!service.state.read().tags.contains_key(&arn));
5767 }
5768
5769 #[test]
5770 fn delete_snapshot_not_found() {
5771 let state = crate::state::ElastiCacheState::new("123456789012", "us-east-1");
5772 let shared = std::sync::Arc::new(parking_lot::RwLock::new(state));
5773 let service = ElastiCacheService::new(shared);
5774 let req = request("DeleteSnapshot", &[("SnapshotName", "nope")]);
5775 assert!(service.delete_snapshot(&req).is_err());
5776 }
5777
5778 #[test]
5779 fn snapshot_xml_contains_all_fields() {
5780 let snap = CacheSnapshot {
5781 snapshot_name: "test-snap".to_string(),
5782 replication_group_id: "rg-1".to_string(),
5783 replication_group_description: "desc".to_string(),
5784 snapshot_status: "available".to_string(),
5785 cache_node_type: "cache.t3.micro".to_string(),
5786 engine: "redis".to_string(),
5787 engine_version: "7.1".to_string(),
5788 num_cache_clusters: 2,
5789 arn: "arn:aws:elasticache:us-east-1:123:snapshot:test-snap".to_string(),
5790 created_at: "2024-01-01T00:00:00Z".to_string(),
5791 snapshot_source: "manual".to_string(),
5792 };
5793 let xml = snapshot_xml(&snap);
5794 assert!(xml.contains("<SnapshotName>test-snap</SnapshotName>"));
5795 assert!(xml.contains("<ReplicationGroupId>rg-1</ReplicationGroupId>"));
5796 assert!(xml.contains("<SnapshotStatus>available</SnapshotStatus>"));
5797 assert!(xml.contains("<SnapshotSource>manual</SnapshotSource>"));
5798 assert!(xml.contains("<CacheNodeType>cache.t3.micro</CacheNodeType>"));
5799 assert!(xml.contains("<Engine>redis</Engine>"));
5800 assert!(xml.contains("<EngineVersion>7.1</EngineVersion>"));
5801 assert!(xml.contains("<NumCacheClusters>2</NumCacheClusters>"));
5802 assert!(xml.contains("<ARN>arn:aws:elasticache:us-east-1:123:snapshot:test-snap</ARN>"));
5803 }
5804}