1use std::convert::TryFrom;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use http::StatusCode;
6use tokio::sync::Mutex as AsyncMutex;
7
8use fakecloud_aws::xml::xml_escape;
9use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::runtime::ElastiCacheRuntime;
14use crate::state::{
15 default_engine_versions, default_parameters_for_family, CacheCluster, CacheEngineVersion,
16 CacheParameterGroup, CacheSnapshot, CacheSubnetGroup, ElastiCacheSnapshot, ElastiCacheState,
17 ElastiCacheUser, ElastiCacheUserGroup, EngineDefaultParameter, GlobalReplicationGroup,
18 GlobalReplicationGroupMember, LogDeliveryConfiguration, RecurringCharge, ReplicationGroup,
19 ReservedCacheNode, ReservedCacheNodesOffering, ServerlessCache, ServerlessCacheDataStorage,
20 ServerlessCacheEcpuPerSecond, ServerlessCacheEndpoint, ServerlessCacheSnapshot,
21 ServerlessCacheUsageLimits, SharedElastiCacheState, ELASTICACHE_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const ELASTICACHE_NS: &str = "http://elasticache.amazonaws.com/doc/2015-02-02/";
25
26const ENGINE_REDIS: &str = "redis";
29const ENGINE_VALKEY: &str = "valkey";
30const ENGINE_MEMCACHED: &str = "memcached";
31const SUPPORTED_ENGINES: &[&str] = &[ENGINE_REDIS, ENGINE_VALKEY, ENGINE_MEMCACHED];
32
33fn validate_engine(engine: &str) -> Result<(), AwsServiceError> {
34 if !SUPPORTED_ENGINES.contains(&engine) {
35 return Err(AwsServiceError::aws_error(
36 StatusCode::BAD_REQUEST,
37 "InvalidParameterValue",
38 format!(
39 "Invalid value for Engine: {engine}. Supported engines: redis, valkey, memcached"
40 ),
41 ));
42 }
43 Ok(())
44}
45
46fn reject_memcached_for(engine: &str, feature: &str) -> Result<(), AwsServiceError> {
47 if engine == ENGINE_MEMCACHED {
48 return Err(AwsServiceError::aws_error(
49 StatusCode::BAD_REQUEST,
50 "InvalidParameterValue",
51 format!("{feature} is not supported for the memcached engine."),
52 ));
53 }
54 Ok(())
55}
56
57fn max_node_groups_for(engine: &str, engine_version: &str) -> i32 {
67 if engine == ENGINE_REDIS {
68 let mut parts = engine_version.split('.').map(|p| p.parse::<u32>().ok());
71 if let Some(Some(major)) = parts.next() {
72 let minor = parts.next().flatten().unwrap_or(0);
73 let patch = parts.next().flatten().unwrap_or(0);
74 let pre_506 = major < 5 || (major == 5 && minor == 0 && patch < 6);
76 if pre_506 {
77 return 90;
78 }
79 }
80 }
81 500
82}
83const SUPPORTED_ACTIONS: &[&str] = &[
84 "AddTagsToResource",
85 "CreateCacheCluster",
86 "CreateGlobalReplicationGroup",
87 "CreateCacheSubnetGroup",
88 "CreateReplicationGroup",
89 "CreateServerlessCache",
90 "CreateServerlessCacheSnapshot",
91 "CreateSnapshot",
92 "CreateUser",
93 "CreateUserGroup",
94 "DecreaseReplicaCount",
95 "DeleteCacheCluster",
96 "DeleteGlobalReplicationGroup",
97 "DeleteCacheSubnetGroup",
98 "DeleteReplicationGroup",
99 "DeleteServerlessCache",
100 "DeleteServerlessCacheSnapshot",
101 "DeleteSnapshot",
102 "DeleteUser",
103 "DeleteUserGroup",
104 "DescribeCacheClusters",
105 "DescribeCacheEngineVersions",
106 "DescribeGlobalReplicationGroups",
107 "DescribeCacheParameterGroups",
108 "DescribeReservedCacheNodes",
109 "DescribeReservedCacheNodesOfferings",
110 "DescribeCacheSubnetGroups",
111 "DescribeEngineDefaultParameters",
112 "DescribeReplicationGroups",
113 "DescribeServerlessCaches",
114 "DescribeServerlessCacheSnapshots",
115 "DescribeSnapshots",
116 "DescribeUserGroups",
117 "DescribeUsers",
118 "DisassociateGlobalReplicationGroup",
119 "FailoverGlobalReplicationGroup",
120 "IncreaseReplicaCount",
121 "ListTagsForResource",
122 "ModifyCacheSubnetGroup",
123 "ModifyGlobalReplicationGroup",
124 "ModifyReplicationGroup",
125 "ModifyServerlessCache",
126 "RemoveTagsFromResource",
127 "TestFailover",
128 "AuthorizeCacheSecurityGroupIngress",
129 "RevokeCacheSecurityGroupIngress",
130 "CreateCacheSecurityGroup",
131 "DeleteCacheSecurityGroup",
132 "DescribeCacheSecurityGroups",
133 "CreateCacheParameterGroup",
134 "DeleteCacheParameterGroup",
135 "ModifyCacheParameterGroup",
136 "ResetCacheParameterGroup",
137 "DescribeCacheParameters",
138 "ModifyCacheCluster",
139 "RebootCacheCluster",
140 "ListAllowedNodeTypeModifications",
141 "ModifyReplicationGroupShardConfiguration",
142 "DecreaseNodeGroupsInGlobalReplicationGroup",
143 "IncreaseNodeGroupsInGlobalReplicationGroup",
144 "RebalanceSlotsInGlobalReplicationGroup",
145 "ModifyUser",
146 "ModifyUserGroup",
147 "PurchaseReservedCacheNodesOffering",
148 "DescribeEvents",
149 "DescribeServiceUpdates",
150 "DescribeUpdateActions",
151 "BatchApplyUpdateAction",
152 "BatchStopUpdateAction",
153 "CopySnapshot",
154 "CopyServerlessCacheSnapshot",
155 "ExportServerlessCacheSnapshot",
156 "StartMigration",
157 "CompleteMigration",
158 "TestMigration",
159];
160
161pub(crate) fn is_recoverable_status(status: &str) -> bool {
167 !matches!(status, "deleting" | "deleted")
168}
169
170pub struct ElastiCacheService {
171 state: SharedElastiCacheState,
172 runtime: Option<Arc<ElastiCacheRuntime>>,
173 snapshot_store: Option<Arc<dyn SnapshotStore>>,
174 snapshot_lock: Arc<AsyncMutex<()>>,
175}
176
177mod clusters;
178mod misc;
179mod parameter_groups;
180mod replication;
181mod security_groups;
182mod serverless;
183mod snapshots;
184mod subnet_groups;
185mod users;
186
187impl ElastiCacheService {
188 pub fn new(state: SharedElastiCacheState) -> Self {
189 Self {
190 state,
191 runtime: None,
192 snapshot_store: None,
193 snapshot_lock: Arc::new(AsyncMutex::new(())),
194 }
195 }
196
197 pub fn with_runtime(mut self, runtime: Arc<ElastiCacheRuntime>) -> Self {
198 self.runtime = Some(runtime);
199 self
200 }
201
202 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
203 self.snapshot_store = Some(store);
204 self
205 }
206
207 async fn save_snapshot(&self) {
208 save_snapshot_static(
209 self.state.clone(),
210 self.snapshot_store.clone(),
211 self.snapshot_lock.clone(),
212 )
213 .await;
214 }
215
216 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
220 let store = self.snapshot_store.clone()?;
221 let state = self.state.clone();
222 let lock = self.snapshot_lock.clone();
223 Some(Arc::new(move || {
224 let state = state.clone();
225 let store = store.clone();
226 let lock = lock.clone();
227 Box::pin(async move {
228 save_snapshot_static(state, Some(store), lock).await;
229 })
230 }))
231 }
232
233 pub async fn recover_persisted_containers(&self) {
239 let Some(runtime) = self.runtime.clone() else {
240 return;
241 };
242
243 struct PendingCluster {
244 account_id: String,
245 id: String,
246 engine: String,
247 }
248 struct PendingReplication {
249 account_id: String,
250 id: String,
251 engine: String,
252 }
253 struct PendingServerless {
254 account_id: String,
255 name: String,
256 }
257
258 let (clusters, replications, serverless) = {
259 let mut accounts = self.state.write();
260 let mut clusters = Vec::new();
261 let mut replications = Vec::new();
262 let mut serverless = Vec::new();
263 for (_, state) in accounts.iter_mut() {
264 let account_id = state.account_id.clone();
265 for (id, cluster) in state.cache_clusters.iter_mut() {
266 if is_recoverable_status(&cluster.cache_cluster_status) {
272 cluster.cache_cluster_status = "starting".to_string();
273 clusters.push(PendingCluster {
274 account_id: account_id.clone(),
275 id: id.clone(),
276 engine: cluster.engine.clone(),
277 });
278 }
279 }
280 for (id, rg) in state.replication_groups.iter_mut() {
281 if is_recoverable_status(&rg.status) {
282 rg.status = "starting".to_string();
283 replications.push(PendingReplication {
284 account_id: account_id.clone(),
285 id: id.clone(),
286 engine: rg.engine.clone(),
287 });
288 }
289 }
290 for (name, sc) in state.serverless_caches.iter_mut() {
291 if is_recoverable_status(&sc.status) {
292 sc.status = "creating".to_string();
293 serverless.push(PendingServerless {
294 account_id: account_id.clone(),
295 name: name.clone(),
296 });
297 }
298 }
299 }
300 (clusters, replications, serverless)
301 };
302
303 let total = clusters.len() + replications.len() + serverless.len();
304 if total == 0 {
305 return;
306 }
307 tracing::info!(
308 count = total,
309 "recovering backing containers for persisted elasticache resources",
310 );
311
312 for c in clusters {
313 let runtime = runtime.clone();
314 let state = self.state.clone();
315 let snapshot_store = self.snapshot_store.clone();
316 let snapshot_lock = self.snapshot_lock.clone();
317 tokio::spawn(async move {
318 let pod_tags: std::collections::BTreeMap<String, String> = {
322 let accounts = state.read();
323 accounts
324 .get(&c.account_id)
325 .and_then(|s| {
326 s.cache_clusters
327 .get(&c.id)
328 .and_then(|cl| s.tags.get(&cl.arn))
329 })
330 .map(|t| t.iter().cloned().collect())
331 .unwrap_or_default()
332 };
333 let result = if c.engine == "memcached" {
334 runtime.ensure_memcached(&c.id, &pod_tags).await
335 } else {
336 runtime.ensure_redis(&c.id, None, &pod_tags).await
337 };
338 match result {
339 Ok(running) => {
340 {
341 let mut accounts = state.write();
342 if let Some(s) = accounts.get_mut(&c.account_id) {
343 if let Some(cluster) = s.cache_clusters.get_mut(&c.id) {
344 cluster.cache_cluster_status = "available".to_string();
345 cluster.endpoint_address = running.endpoint_address.clone();
346 cluster.endpoint_port = running.endpoint_port;
347 cluster.host_port = running.host_port;
348 cluster.container_id = running.container_id;
349 }
350 }
351 }
352 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
353 }
354 Err(error) => {
355 tracing::error!(
356 %error,
357 cache_cluster_id = %c.id,
358 "failed to recover elasticache cache cluster after restart",
359 );
360 {
361 let mut accounts = state.write();
362 if let Some(s) = accounts.get_mut(&c.account_id) {
363 if let Some(cluster) = s.cache_clusters.get_mut(&c.id) {
364 cluster.cache_cluster_status =
365 "incompatible-network".to_string();
366 }
367 }
368 }
369 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
370 }
371 }
372 });
373 }
374
375 for r in replications {
376 let runtime = runtime.clone();
377 let state = self.state.clone();
378 let snapshot_store = self.snapshot_store.clone();
379 let snapshot_lock = self.snapshot_lock.clone();
380 tokio::spawn(async move {
381 let pod_tags: std::collections::BTreeMap<String, String> = {
384 let accounts = state.read();
385 accounts
386 .get(&r.account_id)
387 .and_then(|s| {
388 s.replication_groups
389 .get(&r.id)
390 .and_then(|rg| s.tags.get(&rg.arn))
391 })
392 .map(|t| t.iter().cloned().collect())
393 .unwrap_or_default()
394 };
395 let result = if r.engine == "memcached" {
396 runtime.ensure_memcached(&r.id, &pod_tags).await
397 } else {
398 runtime.ensure_redis(&r.id, None, &pod_tags).await
399 };
400 match result {
401 Ok(running) => {
402 {
403 let mut accounts = state.write();
404 if let Some(s) = accounts.get_mut(&r.account_id) {
405 if let Some(rg) = s.replication_groups.get_mut(&r.id) {
406 rg.status = "available".to_string();
407 rg.endpoint_address = running.endpoint_address.clone();
408 rg.endpoint_port = running.endpoint_port;
409 rg.host_port = running.host_port;
410 rg.container_id = running.container_id;
411 }
412 }
413 }
414 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
415 }
416 Err(error) => {
417 tracing::error!(
418 %error,
419 replication_group_id = %r.id,
420 "failed to recover elasticache replication group after restart",
421 );
422 {
423 let mut accounts = state.write();
424 if let Some(s) = accounts.get_mut(&r.account_id) {
425 if let Some(rg) = s.replication_groups.get_mut(&r.id) {
426 rg.status = "incompatible-network".to_string();
427 }
428 }
429 }
430 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
431 }
432 }
433 });
434 }
435
436 for s in serverless {
437 let runtime = runtime.clone();
438 let state = self.state.clone();
439 let snapshot_store = self.snapshot_store.clone();
440 let snapshot_lock = self.snapshot_lock.clone();
441 tokio::spawn(async move {
442 let pod_tags: std::collections::BTreeMap<String, String> = {
445 let accounts = state.read();
446 accounts
447 .get(&s.account_id)
448 .and_then(|st| {
449 st.serverless_caches
450 .get(&s.name)
451 .and_then(|c| st.tags.get(&c.arn))
452 })
453 .map(|t| t.iter().cloned().collect())
454 .unwrap_or_default()
455 };
456 match runtime.ensure_redis(&s.name, None, &pod_tags).await {
457 Ok(running) => {
458 {
459 let mut accounts = state.write();
460 if let Some(st) = accounts.get_mut(&s.account_id) {
461 if let Some(cache) = st.serverless_caches.get_mut(&s.name) {
462 cache.status = "available".to_string();
463 cache.endpoint.address = running.endpoint_address.clone();
464 cache.endpoint.port = running.endpoint_port;
465 cache.reader_endpoint.address =
466 running.endpoint_address.clone();
467 cache.reader_endpoint.port = running.endpoint_port;
468 cache.host_port = running.host_port;
469 cache.container_id = running.container_id;
470 }
471 }
472 }
473 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
474 }
475 Err(error) => {
476 tracing::error!(
477 %error,
478 serverless_cache_name = %s.name,
479 "failed to recover elasticache serverless cache after restart",
480 );
481 {
482 let mut accounts = state.write();
483 if let Some(st) = accounts.get_mut(&s.account_id) {
484 if let Some(cache) = st.serverless_caches.get_mut(&s.name) {
485 cache.status = "create-failed".to_string();
486 }
487 }
488 }
489 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
490 }
491 }
492 });
493 }
494 }
495}
496
497async fn save_snapshot_static(
498 state: SharedElastiCacheState,
499 store: Option<Arc<dyn SnapshotStore>>,
500 lock: Arc<AsyncMutex<()>>,
501) {
502 let Some(store) = store else {
503 return;
504 };
505 let _guard = lock.lock().await;
506 let snapshot = ElastiCacheSnapshot {
507 schema_version: ELASTICACHE_SNAPSHOT_SCHEMA_VERSION,
508 state: None,
509 accounts: Some(state.read().clone()),
510 };
511 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
512 let bytes = serde_json::to_vec(&snapshot)
513 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
514 store.save(&bytes)
515 })
516 .await;
517 match join {
518 Ok(Ok(())) => {}
519 Ok(Err(err)) => tracing::error!(%err, "failed to write elasticache snapshot"),
520 Err(err) => tracing::error!(%err, "elasticache snapshot task panicked"),
521 }
522}
523
524fn is_mutating_action(action: &str) -> bool {
525 !matches!(
526 action,
527 "DescribeCacheClusters"
528 | "DescribeCacheEngineVersions"
529 | "DescribeGlobalReplicationGroups"
530 | "DescribeCacheParameterGroups"
531 | "DescribeReservedCacheNodes"
532 | "DescribeReservedCacheNodesOfferings"
533 | "DescribeCacheSubnetGroups"
534 | "DescribeEngineDefaultParameters"
535 | "DescribeReplicationGroups"
536 | "DescribeServerlessCaches"
537 | "DescribeServerlessCacheSnapshots"
538 | "DescribeSnapshots"
539 | "DescribeUserGroups"
540 | "DescribeUsers"
541 | "ListTagsForResource"
542 | "DescribeCacheSecurityGroups"
543 | "DescribeCacheParameters"
544 | "DescribeEvents"
545 | "DescribeServiceUpdates"
546 | "DescribeUpdateActions"
547 | "ListAllowedNodeTypeModifications"
548 )
549}
550
551#[async_trait]
552impl AwsService for ElastiCacheService {
553 fn service_name(&self) -> &str {
554 "elasticache"
555 }
556
557 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558 let mutates = is_mutating_action(request.action.as_str());
559 let result = match request.action.as_str() {
560 "AddTagsToResource" => self.add_tags_to_resource(&request),
561 "CreateCacheCluster" => self.create_cache_cluster(&request).await,
562 "CreateGlobalReplicationGroup" => self.create_global_replication_group(&request),
563 "CreateCacheSubnetGroup" => self.create_cache_subnet_group(&request),
564 "CreateReplicationGroup" => self.create_replication_group(&request).await,
565 "CreateServerlessCache" => self.create_serverless_cache(&request).await,
566 "CreateServerlessCacheSnapshot" => self.create_serverless_cache_snapshot(&request),
567 "CreateSnapshot" => self.create_snapshot(&request).await,
568 "CreateUser" => self.create_user(&request),
569 "CreateUserGroup" => self.create_user_group(&request),
570 "DecreaseReplicaCount" => self.decrease_replica_count(&request),
571 "DeleteCacheCluster" => self.delete_cache_cluster(&request).await,
572 "DeleteGlobalReplicationGroup" => self.delete_global_replication_group(&request),
573 "DeleteCacheSubnetGroup" => self.delete_cache_subnet_group(&request),
574 "DeleteReplicationGroup" => self.delete_replication_group(&request).await,
575 "DeleteServerlessCache" => self.delete_serverless_cache(&request).await,
576 "DeleteServerlessCacheSnapshot" => self.delete_serverless_cache_snapshot(&request),
577 "DeleteSnapshot" => self.delete_snapshot(&request),
578 "DeleteUser" => self.delete_user(&request),
579 "DeleteUserGroup" => self.delete_user_group(&request),
580 "DescribeCacheClusters" => self.describe_cache_clusters(&request),
581 "DescribeCacheEngineVersions" => self.describe_cache_engine_versions(&request),
582 "DescribeGlobalReplicationGroups" => self.describe_global_replication_groups(&request),
583 "DescribeCacheParameterGroups" => self.describe_cache_parameter_groups(&request),
584 "DescribeReservedCacheNodes" => self.describe_reserved_cache_nodes(&request),
585 "DescribeReservedCacheNodesOfferings" => {
586 self.describe_reserved_cache_nodes_offerings(&request)
587 }
588 "DescribeCacheSubnetGroups" => self.describe_cache_subnet_groups(&request),
589 "DescribeEngineDefaultParameters" => self.describe_engine_default_parameters(&request),
590 "DescribeReplicationGroups" => self.describe_replication_groups(&request),
591 "DescribeServerlessCaches" => self.describe_serverless_caches(&request),
592 "DescribeServerlessCacheSnapshots" => {
593 self.describe_serverless_cache_snapshots(&request)
594 }
595 "DescribeSnapshots" => self.describe_snapshots(&request),
596 "DescribeUserGroups" => self.describe_user_groups(&request),
597 "DescribeUsers" => self.describe_users(&request),
598 "DisassociateGlobalReplicationGroup" => {
599 self.disassociate_global_replication_group(&request)
600 }
601 "FailoverGlobalReplicationGroup" => self.failover_global_replication_group(&request),
602 "IncreaseReplicaCount" => self.increase_replica_count(&request),
603 "ListTagsForResource" => self.list_tags_for_resource(&request),
604 "ModifyCacheSubnetGroup" => self.modify_cache_subnet_group(&request),
605 "ModifyGlobalReplicationGroup" => self.modify_global_replication_group(&request),
606 "ModifyReplicationGroup" => self.modify_replication_group(&request).await,
607 "ModifyServerlessCache" => self.modify_serverless_cache(&request),
608 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
609 "TestFailover" => self.test_failover(&request),
610 "AuthorizeCacheSecurityGroupIngress" => {
611 self.authorize_cache_security_group_ingress(&request)
612 }
613 "RevokeCacheSecurityGroupIngress" => self.revoke_cache_security_group_ingress(&request),
614 "CreateCacheSecurityGroup" => self.create_cache_security_group(&request),
615 "DeleteCacheSecurityGroup" => self.delete_cache_security_group(&request),
616 "DescribeCacheSecurityGroups" => self.describe_cache_security_groups(&request),
617 "CreateCacheParameterGroup" => self.create_cache_parameter_group(&request),
618 "DeleteCacheParameterGroup" => self.delete_cache_parameter_group(&request),
619 "ModifyCacheParameterGroup" => self.modify_cache_parameter_group(&request).await,
620 "ResetCacheParameterGroup" => self.reset_cache_parameter_group(&request),
621 "DescribeCacheParameters" => self.describe_cache_parameters(&request),
622 "ModifyCacheCluster" => self.modify_cache_cluster(&request),
623 "RebootCacheCluster" => self.reboot_cache_cluster(&request).await,
624 "ListAllowedNodeTypeModifications" => {
625 self.list_allowed_node_type_modifications(&request)
626 }
627 "ModifyReplicationGroupShardConfiguration" => {
628 self.modify_replication_group_shard_configuration(&request)
629 }
630 "DecreaseNodeGroupsInGlobalReplicationGroup" => {
631 self.decrease_node_groups_in_global_replication_group(&request)
632 }
633 "IncreaseNodeGroupsInGlobalReplicationGroup" => {
634 self.increase_node_groups_in_global_replication_group(&request)
635 }
636 "RebalanceSlotsInGlobalReplicationGroup" => {
637 self.rebalance_slots_in_global_replication_group(&request)
638 }
639 "ModifyUser" => self.modify_user(&request).await,
640 "ModifyUserGroup" => self.modify_user_group(&request).await,
641 "PurchaseReservedCacheNodesOffering" => {
642 self.purchase_reserved_cache_nodes_offering(&request)
643 }
644 "DescribeEvents" => self.describe_events(&request),
645 "DescribeServiceUpdates" => self.describe_service_updates(&request),
646 "DescribeUpdateActions" => self.describe_update_actions(&request),
647 "BatchApplyUpdateAction" => self.batch_apply_update_action(&request),
648 "BatchStopUpdateAction" => self.batch_stop_update_action(&request),
649 "CopySnapshot" => self.copy_snapshot(&request),
650 "CopyServerlessCacheSnapshot" => self.copy_serverless_cache_snapshot(&request),
651 "ExportServerlessCacheSnapshot" => self.export_serverless_cache_snapshot(&request),
652 "StartMigration" => self.start_migration(&request),
653 "CompleteMigration" => self.complete_migration(&request),
654 "TestMigration" => self.test_migration(&request),
655 _ => Err(AwsServiceError::action_not_implemented(
656 self.service_name(),
657 &request.action,
658 )),
659 };
660 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
661 self.save_snapshot().await;
662 }
663 result
664 }
665
666 fn supported_actions(&self) -> &[&str] {
667 SUPPORTED_ACTIONS
668 }
669}
670
671impl ElastiCacheService {
672 fn start_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
675 self.migration_op(request, "StartMigration", "queued")
676 }
677
678 fn complete_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679 let id = required_query_param(request, "ReplicationGroupId")?;
680 let mut accounts = self.state.write();
681 let state = accounts.get_or_create(&request.account_id);
682 if !state.migrations.contains_key(&id) {
687 return Err(AwsServiceError::aws_error(
688 StatusCode::NOT_FOUND,
689 "ReplicationGroupNotUnderMigrationFault",
690 format!("ReplicationGroup {id} is not currently being migrated."),
691 ));
692 }
693 if !state.replication_groups.contains_key(&id) {
694 return Err(AwsServiceError::aws_error(
695 StatusCode::NOT_FOUND,
696 "ReplicationGroupNotFoundFault",
697 format!("ReplicationGroup {id} not found."),
698 ));
699 }
700 let migration = state.migrations.get_mut(&id).expect("checked above");
701 migration.status = "complete".to_string();
702 let group = state.replication_groups.get(&id).expect("checked above");
703 let region = state.region.clone();
704 let xml = replication_group_xml(group, ®ion);
705 Ok(AwsResponse::xml(
706 StatusCode::OK,
707 query_response_xml(
708 "CompleteMigration",
709 ELASTICACHE_NS,
710 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
711 &request.request_id,
712 ),
713 ))
714 }
715
716 fn test_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
717 self.migration_op(request, "TestMigration", "test-passed")
718 }
719
720 fn migration_op(
721 &self,
722 request: &AwsRequest,
723 action: &str,
724 status: &str,
725 ) -> Result<AwsResponse, AwsServiceError> {
726 let id = required_query_param(request, "ReplicationGroupId")?;
727 let endpoint_addr =
730 collect_member_field(request, "CustomerNodeEndpointList.member", "Address")
731 .into_iter()
732 .next()
733 .unwrap_or_else(|| "127.0.0.1".to_string());
734 let endpoint_port =
735 collect_member_field(request, "CustomerNodeEndpointList.member", "Port")
736 .into_iter()
737 .next()
738 .and_then(|v| v.parse::<i32>().ok())
739 .unwrap_or(6379);
740
741 let mut accounts = self.state.write();
742 let state = accounts.get_or_create(&request.account_id);
743 let group = state.replication_groups.get(&id).ok_or_else(|| {
744 AwsServiceError::aws_error(
745 StatusCode::NOT_FOUND,
746 "ReplicationGroupNotFoundFault",
747 format!("ReplicationGroup {id} not found."),
748 )
749 })?;
750 let region = state.region.clone();
751 let xml = replication_group_xml(group, ®ion);
752 state.migrations.insert(
753 id.clone(),
754 crate::state::Migration {
755 replication_group_id: id,
756 customer_node_endpoint_address: endpoint_addr,
757 customer_node_endpoint_port: endpoint_port,
758 status: status.to_string(),
759 started_at: chrono::Utc::now().to_rfc3339(),
760 },
761 );
762 Ok(AwsResponse::xml(
763 StatusCode::OK,
764 query_response_xml(
765 action,
766 ELASTICACHE_NS,
767 &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
768 &request.request_id,
769 ),
770 ))
771 }
772
773 async fn apply_parameters_for_group(&self, account_id: &str, param_group_name: &str) {
777 let Some(runtime) = self.runtime.as_ref() else {
778 return;
779 };
780 let (target_ids, params) = {
781 let accounts = self.state.read();
782 let state = accounts.get(account_id);
783 let Some(state) = state else { return };
784 let mut target_ids = Vec::new();
785 for c in state.cache_clusters.values() {
786 if c.cache_parameter_group_name.as_deref() == Some(param_group_name)
787 && (c.engine == ENGINE_REDIS || c.engine == ENGINE_VALKEY)
788 {
789 target_ids.push(c.cache_cluster_id.clone());
790 }
791 }
792 for g in state.replication_groups.values() {
793 if g.cache_parameter_group_name.as_deref() == Some(param_group_name)
794 && (g.engine == ENGINE_REDIS || g.engine == ENGINE_VALKEY)
795 {
796 target_ids.push(g.replication_group_id.clone());
797 }
798 }
799 let params = state
800 .parameter_group_parameters
801 .get(param_group_name)
802 .cloned()
803 .unwrap_or_default();
804 (target_ids, params)
805 };
806 for id in target_ids {
807 for param in ¶ms {
808 if !param.is_modifiable {
809 continue;
810 }
811 let args = vec![
812 "CONFIG".to_string(),
813 "SET".to_string(),
814 param.parameter_name.clone(),
815 param.parameter_value.clone(),
816 ];
817 match runtime.exec_redis(&id, &args).await {
818 Ok(output) if !output.success => {
819 tracing::warn!(
820 resource_id = %id,
821 param = %param.parameter_name,
822 stderr = %String::from_utf8_lossy(&output.stderr),
823 "CONFIG SET failed"
824 );
825 }
826 Err(e) => {
827 tracing::warn!(
828 resource_id = %id,
829 param = %param.parameter_name,
830 %e,
831 "CONFIG SET exec failed"
832 );
833 }
834 _ => {}
835 }
836 }
837 }
838 }
839}
840
841#[path = "../helpers.rs"]
844mod helpers;
845use helpers::*;
846
847#[cfg(test)]
848#[path = "../service_tests.rs"]
849mod tests;