Skip to main content

fakecloud_elasticache/service/
mod.rs

1use std::convert::TryFrom;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use http::StatusCode;
6use tokio::sync::Mutex as AsyncMutex;
7
8use fakecloud_aws::xml::xml_escape;
9use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::runtime::ElastiCacheRuntime;
14use crate::state::{
15    default_engine_versions, default_parameters_for_family, CacheCluster, CacheEngineVersion,
16    CacheParameterGroup, CacheSnapshot, CacheSubnetGroup, ElastiCacheSnapshot, ElastiCacheState,
17    ElastiCacheUser, ElastiCacheUserGroup, EngineDefaultParameter, GlobalReplicationGroup,
18    GlobalReplicationGroupMember, LogDeliveryConfiguration, RecurringCharge, ReplicationGroup,
19    ReservedCacheNode, ReservedCacheNodesOffering, ServerlessCache, ServerlessCacheDataStorage,
20    ServerlessCacheEcpuPerSecond, ServerlessCacheEndpoint, ServerlessCacheSnapshot,
21    ServerlessCacheUsageLimits, SharedElastiCacheState, ELASTICACHE_SNAPSHOT_SCHEMA_VERSION,
22};
23
24const ELASTICACHE_NS: &str = "http://elasticache.amazonaws.com/doc/2015-02-02/";
25
26/// Cache engine wire values. Stored as ``String`` on ``CacheCluster`` etc., but
27/// validated against this list at the wire boundary so a typo can't slip in.
28const ENGINE_REDIS: &str = "redis";
29const ENGINE_VALKEY: &str = "valkey";
30const ENGINE_MEMCACHED: &str = "memcached";
31const SUPPORTED_ENGINES: &[&str] = &[ENGINE_REDIS, ENGINE_VALKEY, ENGINE_MEMCACHED];
32
33fn validate_engine(engine: &str) -> Result<(), AwsServiceError> {
34    if !SUPPORTED_ENGINES.contains(&engine) {
35        return Err(AwsServiceError::aws_error(
36            StatusCode::BAD_REQUEST,
37            "InvalidParameterValue",
38            format!(
39                "Invalid value for Engine: {engine}. Supported engines: redis, valkey, memcached"
40            ),
41        ));
42    }
43    Ok(())
44}
45
46fn reject_memcached_for(engine: &str, feature: &str) -> Result<(), AwsServiceError> {
47    if engine == ENGINE_MEMCACHED {
48        return Err(AwsServiceError::aws_error(
49            StatusCode::BAD_REQUEST,
50            "InvalidParameterValue",
51            format!("{feature} is not supported for the memcached engine."),
52        ));
53    }
54    Ok(())
55}
56
57/// Engine + version-dependent ceiling on NumNodeGroups for cluster-mode
58/// replication groups, mirroring AWS's documented limits. Redis prior to
59/// 5.0.6 was capped at 90 shards; 5.0.6+ and Valkey raised the ceiling to
60/// 500. Memcached has no replication groups (rejected upstream) but
61/// returning the modern 500 here makes the function total. Unparseable
62/// version strings fall through to the modern ceiling because fakecloud
63/// only ships the redis 7.x / valkey 8.x backing images today; treating
64/// unknown versions as "legacy" would surprise callers passing odd
65/// strings like the engine label `Redis`.
66fn max_node_groups_for(engine: &str, engine_version: &str) -> i32 {
67    if engine == ENGINE_REDIS {
68        // Parse "MAJOR.MINOR.PATCH" or "MAJOR.MINOR" and only flip to the
69        // legacy 90-shard cap when we successfully parsed a major version.
70        let mut parts = engine_version.split('.').map(|p| p.parse::<u32>().ok());
71        if let Some(Some(major)) = parts.next() {
72            let minor = parts.next().flatten().unwrap_or(0);
73            let patch = parts.next().flatten().unwrap_or(0);
74            // 5.0.6 is the threshold. Anything strictly earlier is capped at 90.
75            let pre_506 = major < 5 || (major == 5 && minor == 0 && patch < 6);
76            if pre_506 {
77                return 90;
78            }
79        }
80    }
81    500
82}
83const SUPPORTED_ACTIONS: &[&str] = &[
84    "AddTagsToResource",
85    "CreateCacheCluster",
86    "CreateGlobalReplicationGroup",
87    "CreateCacheSubnetGroup",
88    "CreateReplicationGroup",
89    "CreateServerlessCache",
90    "CreateServerlessCacheSnapshot",
91    "CreateSnapshot",
92    "CreateUser",
93    "CreateUserGroup",
94    "DecreaseReplicaCount",
95    "DeleteCacheCluster",
96    "DeleteGlobalReplicationGroup",
97    "DeleteCacheSubnetGroup",
98    "DeleteReplicationGroup",
99    "DeleteServerlessCache",
100    "DeleteServerlessCacheSnapshot",
101    "DeleteSnapshot",
102    "DeleteUser",
103    "DeleteUserGroup",
104    "DescribeCacheClusters",
105    "DescribeCacheEngineVersions",
106    "DescribeGlobalReplicationGroups",
107    "DescribeCacheParameterGroups",
108    "DescribeReservedCacheNodes",
109    "DescribeReservedCacheNodesOfferings",
110    "DescribeCacheSubnetGroups",
111    "DescribeEngineDefaultParameters",
112    "DescribeReplicationGroups",
113    "DescribeServerlessCaches",
114    "DescribeServerlessCacheSnapshots",
115    "DescribeSnapshots",
116    "DescribeUserGroups",
117    "DescribeUsers",
118    "DisassociateGlobalReplicationGroup",
119    "FailoverGlobalReplicationGroup",
120    "IncreaseReplicaCount",
121    "ListTagsForResource",
122    "ModifyCacheSubnetGroup",
123    "ModifyGlobalReplicationGroup",
124    "ModifyReplicationGroup",
125    "ModifyServerlessCache",
126    "RemoveTagsFromResource",
127    "TestFailover",
128    "AuthorizeCacheSecurityGroupIngress",
129    "RevokeCacheSecurityGroupIngress",
130    "CreateCacheSecurityGroup",
131    "DeleteCacheSecurityGroup",
132    "DescribeCacheSecurityGroups",
133    "CreateCacheParameterGroup",
134    "DeleteCacheParameterGroup",
135    "ModifyCacheParameterGroup",
136    "ResetCacheParameterGroup",
137    "DescribeCacheParameters",
138    "ModifyCacheCluster",
139    "RebootCacheCluster",
140    "ListAllowedNodeTypeModifications",
141    "ModifyReplicationGroupShardConfiguration",
142    "DecreaseNodeGroupsInGlobalReplicationGroup",
143    "IncreaseNodeGroupsInGlobalReplicationGroup",
144    "RebalanceSlotsInGlobalReplicationGroup",
145    "ModifyUser",
146    "ModifyUserGroup",
147    "PurchaseReservedCacheNodesOffering",
148    "DescribeEvents",
149    "DescribeServiceUpdates",
150    "DescribeUpdateActions",
151    "BatchApplyUpdateAction",
152    "BatchStopUpdateAction",
153    "CopySnapshot",
154    "CopyServerlessCacheSnapshot",
155    "ExportServerlessCacheSnapshot",
156    "StartMigration",
157    "CompleteMigration",
158    "TestMigration",
159];
160
161/// On restart, recovery must re-drive every resource that isn't being torn
162/// down -- not just `available` ones. A resource snapshotted mid-transition
163/// (creating/modifying/rebooting/starting) would otherwise never have its
164/// background task re-spawned and stay stuck forever (bug-audit 2026-06-20,
165/// 4.5).
166pub(crate) fn is_recoverable_status(status: &str) -> bool {
167    !matches!(status, "deleting" | "deleted")
168}
169
170pub struct ElastiCacheService {
171    state: SharedElastiCacheState,
172    runtime: Option<Arc<ElastiCacheRuntime>>,
173    snapshot_store: Option<Arc<dyn SnapshotStore>>,
174    snapshot_lock: Arc<AsyncMutex<()>>,
175}
176
177mod clusters;
178mod misc;
179mod parameter_groups;
180mod replication;
181mod security_groups;
182mod serverless;
183mod snapshots;
184mod subnet_groups;
185mod users;
186
187impl ElastiCacheService {
188    pub fn new(state: SharedElastiCacheState) -> Self {
189        Self {
190            state,
191            runtime: None,
192            snapshot_store: None,
193            snapshot_lock: Arc::new(AsyncMutex::new(())),
194        }
195    }
196
197    pub fn with_runtime(mut self, runtime: Arc<ElastiCacheRuntime>) -> Self {
198        self.runtime = Some(runtime);
199        self
200    }
201
202    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
203        self.snapshot_store = Some(store);
204        self
205    }
206
207    async fn save_snapshot(&self) {
208        save_snapshot_static(
209            self.state.clone(),
210            self.snapshot_store.clone(),
211            self.snapshot_lock.clone(),
212        )
213        .await;
214    }
215
216    /// Build a hook that persists the current state when invoked, or `None` in
217    /// memory mode. The CloudFormation provisioner mutates `state` directly and
218    /// uses this to write a CFN-provisioned resource through to disk.
219    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
220        let store = self.snapshot_store.clone()?;
221        let state = self.state.clone();
222        let lock = self.snapshot_lock.clone();
223        Some(Arc::new(move || {
224            let state = state.clone();
225            let store = store.clone();
226            let lock = lock.clone();
227            Box::pin(async move {
228                save_snapshot_static(state, Some(store), lock).await;
229            })
230        }))
231    }
232
233    /// Recreate the backing Docker/Podman containers for persisted cache
234    /// clusters, replication groups, and serverless caches after a
235    /// fakecloud restart. Same bug class as RDS #1338 — without this,
236    /// describe ops report the cluster as `available` while the
237    /// container is gone, so the endpoint is dead.
238    pub async fn recover_persisted_containers(&self) {
239        let Some(runtime) = self.runtime.clone() else {
240            return;
241        };
242
243        struct PendingCluster {
244            account_id: String,
245            id: String,
246            engine: String,
247        }
248        struct PendingReplication {
249            account_id: String,
250            id: String,
251            engine: String,
252        }
253        struct PendingServerless {
254            account_id: String,
255            name: String,
256        }
257
258        let (clusters, replications, serverless) = {
259            let mut accounts = self.state.write();
260            let mut clusters = Vec::new();
261            let mut replications = Vec::new();
262            let mut serverless = Vec::new();
263            for (_, state) in accounts.iter_mut() {
264                let account_id = state.account_id.clone();
265                for (id, cluster) in state.cache_clusters.iter_mut() {
266                    // Re-drive anything not being torn down. The old code only
267                    // recovered "available" resources, so one snapshotted
268                    // mid-transition (creating/modifying/rebooting) was never
269                    // re-spawned and stayed stuck forever (bug-audit 2026-06-20,
270                    // 4.5).
271                    if is_recoverable_status(&cluster.cache_cluster_status) {
272                        cluster.cache_cluster_status = "starting".to_string();
273                        clusters.push(PendingCluster {
274                            account_id: account_id.clone(),
275                            id: id.clone(),
276                            engine: cluster.engine.clone(),
277                        });
278                    }
279                }
280                for (id, rg) in state.replication_groups.iter_mut() {
281                    if is_recoverable_status(&rg.status) {
282                        rg.status = "starting".to_string();
283                        replications.push(PendingReplication {
284                            account_id: account_id.clone(),
285                            id: id.clone(),
286                            engine: rg.engine.clone(),
287                        });
288                    }
289                }
290                for (name, sc) in state.serverless_caches.iter_mut() {
291                    if is_recoverable_status(&sc.status) {
292                        sc.status = "creating".to_string();
293                        serverless.push(PendingServerless {
294                            account_id: account_id.clone(),
295                            name: name.clone(),
296                        });
297                    }
298                }
299            }
300            (clusters, replications, serverless)
301        };
302
303        let total = clusters.len() + replications.len() + serverless.len();
304        if total == 0 {
305            return;
306        }
307        tracing::info!(
308            count = total,
309            "recovering backing containers for persisted elasticache resources",
310        );
311
312        for c in clusters {
313            let runtime = runtime.clone();
314            let state = self.state.clone();
315            let snapshot_store = self.snapshot_store.clone();
316            let snapshot_lock = self.snapshot_lock.clone();
317            tokio::spawn(async move {
318                // Re-derive this cluster's reserved `fakecloud-k8s/*`
319                // scheduling tags from persisted state so the recreated Pod
320                // keeps its node placement across a restart.
321                let pod_tags: std::collections::BTreeMap<String, String> = {
322                    let accounts = state.read();
323                    accounts
324                        .get(&c.account_id)
325                        .and_then(|s| {
326                            s.cache_clusters
327                                .get(&c.id)
328                                .and_then(|cl| s.tags.get(&cl.arn))
329                        })
330                        .map(|t| t.iter().cloned().collect())
331                        .unwrap_or_default()
332                };
333                let result = if c.engine == "memcached" {
334                    runtime.ensure_memcached(&c.id, &pod_tags).await
335                } else {
336                    runtime.ensure_redis(&c.id, None, &pod_tags).await
337                };
338                match result {
339                    Ok(running) => {
340                        {
341                            let mut accounts = state.write();
342                            if let Some(s) = accounts.get_mut(&c.account_id) {
343                                if let Some(cluster) = s.cache_clusters.get_mut(&c.id) {
344                                    cluster.cache_cluster_status = "available".to_string();
345                                    cluster.endpoint_address = running.endpoint_address.clone();
346                                    cluster.endpoint_port = running.endpoint_port;
347                                    cluster.host_port = running.host_port;
348                                    cluster.container_id = running.container_id;
349                                }
350                            }
351                        }
352                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
353                    }
354                    Err(error) => {
355                        tracing::error!(
356                            %error,
357                            cache_cluster_id = %c.id,
358                            "failed to recover elasticache cache cluster after restart",
359                        );
360                        {
361                            let mut accounts = state.write();
362                            if let Some(s) = accounts.get_mut(&c.account_id) {
363                                if let Some(cluster) = s.cache_clusters.get_mut(&c.id) {
364                                    cluster.cache_cluster_status =
365                                        "incompatible-network".to_string();
366                                }
367                            }
368                        }
369                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
370                    }
371                }
372            });
373        }
374
375        for r in replications {
376            let runtime = runtime.clone();
377            let state = self.state.clone();
378            let snapshot_store = self.snapshot_store.clone();
379            let snapshot_lock = self.snapshot_lock.clone();
380            tokio::spawn(async move {
381                // Re-derive this group's reserved `fakecloud-k8s/*`
382                // scheduling tags from persisted state for the recreated Pod.
383                let pod_tags: std::collections::BTreeMap<String, String> = {
384                    let accounts = state.read();
385                    accounts
386                        .get(&r.account_id)
387                        .and_then(|s| {
388                            s.replication_groups
389                                .get(&r.id)
390                                .and_then(|rg| s.tags.get(&rg.arn))
391                        })
392                        .map(|t| t.iter().cloned().collect())
393                        .unwrap_or_default()
394                };
395                let result = if r.engine == "memcached" {
396                    runtime.ensure_memcached(&r.id, &pod_tags).await
397                } else {
398                    runtime.ensure_redis(&r.id, None, &pod_tags).await
399                };
400                match result {
401                    Ok(running) => {
402                        {
403                            let mut accounts = state.write();
404                            if let Some(s) = accounts.get_mut(&r.account_id) {
405                                if let Some(rg) = s.replication_groups.get_mut(&r.id) {
406                                    rg.status = "available".to_string();
407                                    rg.endpoint_address = running.endpoint_address.clone();
408                                    rg.endpoint_port = running.endpoint_port;
409                                    rg.host_port = running.host_port;
410                                    rg.container_id = running.container_id;
411                                }
412                            }
413                        }
414                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
415                    }
416                    Err(error) => {
417                        tracing::error!(
418                            %error,
419                            replication_group_id = %r.id,
420                            "failed to recover elasticache replication group after restart",
421                        );
422                        {
423                            let mut accounts = state.write();
424                            if let Some(s) = accounts.get_mut(&r.account_id) {
425                                if let Some(rg) = s.replication_groups.get_mut(&r.id) {
426                                    rg.status = "incompatible-network".to_string();
427                                }
428                            }
429                        }
430                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
431                    }
432                }
433            });
434        }
435
436        for s in serverless {
437            let runtime = runtime.clone();
438            let state = self.state.clone();
439            let snapshot_store = self.snapshot_store.clone();
440            let snapshot_lock = self.snapshot_lock.clone();
441            tokio::spawn(async move {
442                // Re-derive this serverless cache's reserved `fakecloud-k8s/*`
443                // scheduling tags from persisted state for the recreated Pod.
444                let pod_tags: std::collections::BTreeMap<String, String> = {
445                    let accounts = state.read();
446                    accounts
447                        .get(&s.account_id)
448                        .and_then(|st| {
449                            st.serverless_caches
450                                .get(&s.name)
451                                .and_then(|c| st.tags.get(&c.arn))
452                        })
453                        .map(|t| t.iter().cloned().collect())
454                        .unwrap_or_default()
455                };
456                match runtime.ensure_redis(&s.name, None, &pod_tags).await {
457                    Ok(running) => {
458                        {
459                            let mut accounts = state.write();
460                            if let Some(st) = accounts.get_mut(&s.account_id) {
461                                if let Some(cache) = st.serverless_caches.get_mut(&s.name) {
462                                    cache.status = "available".to_string();
463                                    cache.endpoint.address = running.endpoint_address.clone();
464                                    cache.endpoint.port = running.endpoint_port;
465                                    cache.reader_endpoint.address =
466                                        running.endpoint_address.clone();
467                                    cache.reader_endpoint.port = running.endpoint_port;
468                                    cache.host_port = running.host_port;
469                                    cache.container_id = running.container_id;
470                                }
471                            }
472                        }
473                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
474                    }
475                    Err(error) => {
476                        tracing::error!(
477                            %error,
478                            serverless_cache_name = %s.name,
479                            "failed to recover elasticache serverless cache after restart",
480                        );
481                        {
482                            let mut accounts = state.write();
483                            if let Some(st) = accounts.get_mut(&s.account_id) {
484                                if let Some(cache) = st.serverless_caches.get_mut(&s.name) {
485                                    cache.status = "create-failed".to_string();
486                                }
487                            }
488                        }
489                        save_snapshot_static(state, snapshot_store, snapshot_lock).await;
490                    }
491                }
492            });
493        }
494    }
495}
496
497async fn save_snapshot_static(
498    state: SharedElastiCacheState,
499    store: Option<Arc<dyn SnapshotStore>>,
500    lock: Arc<AsyncMutex<()>>,
501) {
502    let Some(store) = store else {
503        return;
504    };
505    let _guard = lock.lock().await;
506    let snapshot = ElastiCacheSnapshot {
507        schema_version: ELASTICACHE_SNAPSHOT_SCHEMA_VERSION,
508        state: None,
509        accounts: Some(state.read().clone()),
510    };
511    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
512        let bytes = serde_json::to_vec(&snapshot)
513            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
514        store.save(&bytes)
515    })
516    .await;
517    match join {
518        Ok(Ok(())) => {}
519        Ok(Err(err)) => tracing::error!(%err, "failed to write elasticache snapshot"),
520        Err(err) => tracing::error!(%err, "elasticache snapshot task panicked"),
521    }
522}
523
524fn is_mutating_action(action: &str) -> bool {
525    !matches!(
526        action,
527        "DescribeCacheClusters"
528            | "DescribeCacheEngineVersions"
529            | "DescribeGlobalReplicationGroups"
530            | "DescribeCacheParameterGroups"
531            | "DescribeReservedCacheNodes"
532            | "DescribeReservedCacheNodesOfferings"
533            | "DescribeCacheSubnetGroups"
534            | "DescribeEngineDefaultParameters"
535            | "DescribeReplicationGroups"
536            | "DescribeServerlessCaches"
537            | "DescribeServerlessCacheSnapshots"
538            | "DescribeSnapshots"
539            | "DescribeUserGroups"
540            | "DescribeUsers"
541            | "ListTagsForResource"
542            | "DescribeCacheSecurityGroups"
543            | "DescribeCacheParameters"
544            | "DescribeEvents"
545            | "DescribeServiceUpdates"
546            | "DescribeUpdateActions"
547            | "ListAllowedNodeTypeModifications"
548    )
549}
550
551#[async_trait]
552impl AwsService for ElastiCacheService {
553    fn service_name(&self) -> &str {
554        "elasticache"
555    }
556
557    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
558        let mutates = is_mutating_action(request.action.as_str());
559        let result = match request.action.as_str() {
560            "AddTagsToResource" => self.add_tags_to_resource(&request),
561            "CreateCacheCluster" => self.create_cache_cluster(&request).await,
562            "CreateGlobalReplicationGroup" => self.create_global_replication_group(&request),
563            "CreateCacheSubnetGroup" => self.create_cache_subnet_group(&request),
564            "CreateReplicationGroup" => self.create_replication_group(&request).await,
565            "CreateServerlessCache" => self.create_serverless_cache(&request).await,
566            "CreateServerlessCacheSnapshot" => self.create_serverless_cache_snapshot(&request),
567            "CreateSnapshot" => self.create_snapshot(&request).await,
568            "CreateUser" => self.create_user(&request),
569            "CreateUserGroup" => self.create_user_group(&request),
570            "DecreaseReplicaCount" => self.decrease_replica_count(&request),
571            "DeleteCacheCluster" => self.delete_cache_cluster(&request).await,
572            "DeleteGlobalReplicationGroup" => self.delete_global_replication_group(&request),
573            "DeleteCacheSubnetGroup" => self.delete_cache_subnet_group(&request),
574            "DeleteReplicationGroup" => self.delete_replication_group(&request).await,
575            "DeleteServerlessCache" => self.delete_serverless_cache(&request).await,
576            "DeleteServerlessCacheSnapshot" => self.delete_serverless_cache_snapshot(&request),
577            "DeleteSnapshot" => self.delete_snapshot(&request),
578            "DeleteUser" => self.delete_user(&request),
579            "DeleteUserGroup" => self.delete_user_group(&request),
580            "DescribeCacheClusters" => self.describe_cache_clusters(&request),
581            "DescribeCacheEngineVersions" => self.describe_cache_engine_versions(&request),
582            "DescribeGlobalReplicationGroups" => self.describe_global_replication_groups(&request),
583            "DescribeCacheParameterGroups" => self.describe_cache_parameter_groups(&request),
584            "DescribeReservedCacheNodes" => self.describe_reserved_cache_nodes(&request),
585            "DescribeReservedCacheNodesOfferings" => {
586                self.describe_reserved_cache_nodes_offerings(&request)
587            }
588            "DescribeCacheSubnetGroups" => self.describe_cache_subnet_groups(&request),
589            "DescribeEngineDefaultParameters" => self.describe_engine_default_parameters(&request),
590            "DescribeReplicationGroups" => self.describe_replication_groups(&request),
591            "DescribeServerlessCaches" => self.describe_serverless_caches(&request),
592            "DescribeServerlessCacheSnapshots" => {
593                self.describe_serverless_cache_snapshots(&request)
594            }
595            "DescribeSnapshots" => self.describe_snapshots(&request),
596            "DescribeUserGroups" => self.describe_user_groups(&request),
597            "DescribeUsers" => self.describe_users(&request),
598            "DisassociateGlobalReplicationGroup" => {
599                self.disassociate_global_replication_group(&request)
600            }
601            "FailoverGlobalReplicationGroup" => self.failover_global_replication_group(&request),
602            "IncreaseReplicaCount" => self.increase_replica_count(&request),
603            "ListTagsForResource" => self.list_tags_for_resource(&request),
604            "ModifyCacheSubnetGroup" => self.modify_cache_subnet_group(&request),
605            "ModifyGlobalReplicationGroup" => self.modify_global_replication_group(&request),
606            "ModifyReplicationGroup" => self.modify_replication_group(&request).await,
607            "ModifyServerlessCache" => self.modify_serverless_cache(&request),
608            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
609            "TestFailover" => self.test_failover(&request),
610            "AuthorizeCacheSecurityGroupIngress" => {
611                self.authorize_cache_security_group_ingress(&request)
612            }
613            "RevokeCacheSecurityGroupIngress" => self.revoke_cache_security_group_ingress(&request),
614            "CreateCacheSecurityGroup" => self.create_cache_security_group(&request),
615            "DeleteCacheSecurityGroup" => self.delete_cache_security_group(&request),
616            "DescribeCacheSecurityGroups" => self.describe_cache_security_groups(&request),
617            "CreateCacheParameterGroup" => self.create_cache_parameter_group(&request),
618            "DeleteCacheParameterGroup" => self.delete_cache_parameter_group(&request),
619            "ModifyCacheParameterGroup" => self.modify_cache_parameter_group(&request).await,
620            "ResetCacheParameterGroup" => self.reset_cache_parameter_group(&request),
621            "DescribeCacheParameters" => self.describe_cache_parameters(&request),
622            "ModifyCacheCluster" => self.modify_cache_cluster(&request),
623            "RebootCacheCluster" => self.reboot_cache_cluster(&request).await,
624            "ListAllowedNodeTypeModifications" => {
625                self.list_allowed_node_type_modifications(&request)
626            }
627            "ModifyReplicationGroupShardConfiguration" => {
628                self.modify_replication_group_shard_configuration(&request)
629            }
630            "DecreaseNodeGroupsInGlobalReplicationGroup" => {
631                self.decrease_node_groups_in_global_replication_group(&request)
632            }
633            "IncreaseNodeGroupsInGlobalReplicationGroup" => {
634                self.increase_node_groups_in_global_replication_group(&request)
635            }
636            "RebalanceSlotsInGlobalReplicationGroup" => {
637                self.rebalance_slots_in_global_replication_group(&request)
638            }
639            "ModifyUser" => self.modify_user(&request).await,
640            "ModifyUserGroup" => self.modify_user_group(&request).await,
641            "PurchaseReservedCacheNodesOffering" => {
642                self.purchase_reserved_cache_nodes_offering(&request)
643            }
644            "DescribeEvents" => self.describe_events(&request),
645            "DescribeServiceUpdates" => self.describe_service_updates(&request),
646            "DescribeUpdateActions" => self.describe_update_actions(&request),
647            "BatchApplyUpdateAction" => self.batch_apply_update_action(&request),
648            "BatchStopUpdateAction" => self.batch_stop_update_action(&request),
649            "CopySnapshot" => self.copy_snapshot(&request),
650            "CopyServerlessCacheSnapshot" => self.copy_serverless_cache_snapshot(&request),
651            "ExportServerlessCacheSnapshot" => self.export_serverless_cache_snapshot(&request),
652            "StartMigration" => self.start_migration(&request),
653            "CompleteMigration" => self.complete_migration(&request),
654            "TestMigration" => self.test_migration(&request),
655            _ => Err(AwsServiceError::action_not_implemented(
656                self.service_name(),
657                &request.action,
658            )),
659        };
660        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
661            self.save_snapshot().await;
662        }
663        result
664    }
665
666    fn supported_actions(&self) -> &[&str] {
667        SUPPORTED_ACTIONS
668    }
669}
670
671impl ElastiCacheService {
672    // ── Migrations ──
673
674    fn start_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
675        self.migration_op(request, "StartMigration", "queued")
676    }
677
678    fn complete_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679        let id = required_query_param(request, "ReplicationGroupId")?;
680        let mut accounts = self.state.write();
681        let state = accounts.get_or_create(&request.account_id);
682        // Validate all prerequisites BEFORE mutating migration.status —
683        // the prior order flipped status to "complete" and only then
684        // surfaced ReplicationGroupNotFoundFault on the lookup, leaving
685        // a stale "complete" status on the migration.
686        if !state.migrations.contains_key(&id) {
687            return Err(AwsServiceError::aws_error(
688                StatusCode::NOT_FOUND,
689                "ReplicationGroupNotUnderMigrationFault",
690                format!("ReplicationGroup {id} is not currently being migrated."),
691            ));
692        }
693        if !state.replication_groups.contains_key(&id) {
694            return Err(AwsServiceError::aws_error(
695                StatusCode::NOT_FOUND,
696                "ReplicationGroupNotFoundFault",
697                format!("ReplicationGroup {id} not found."),
698            ));
699        }
700        let migration = state.migrations.get_mut(&id).expect("checked above");
701        migration.status = "complete".to_string();
702        let group = state.replication_groups.get(&id).expect("checked above");
703        let region = state.region.clone();
704        let xml = replication_group_xml(group, &region);
705        Ok(AwsResponse::xml(
706            StatusCode::OK,
707            query_response_xml(
708                "CompleteMigration",
709                ELASTICACHE_NS,
710                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
711                &request.request_id,
712            ),
713        ))
714    }
715
716    fn test_migration(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
717        self.migration_op(request, "TestMigration", "test-passed")
718    }
719
720    fn migration_op(
721        &self,
722        request: &AwsRequest,
723        action: &str,
724        status: &str,
725    ) -> Result<AwsResponse, AwsServiceError> {
726        let id = required_query_param(request, "ReplicationGroupId")?;
727        // AWS Query protocol nests indexed members under .{index}.{Field},
728        // not .{Field}.{index}.
729        let endpoint_addr =
730            collect_member_field(request, "CustomerNodeEndpointList.member", "Address")
731                .into_iter()
732                .next()
733                .unwrap_or_else(|| "127.0.0.1".to_string());
734        let endpoint_port =
735            collect_member_field(request, "CustomerNodeEndpointList.member", "Port")
736                .into_iter()
737                .next()
738                .and_then(|v| v.parse::<i32>().ok())
739                .unwrap_or(6379);
740
741        let mut accounts = self.state.write();
742        let state = accounts.get_or_create(&request.account_id);
743        let group = state.replication_groups.get(&id).ok_or_else(|| {
744            AwsServiceError::aws_error(
745                StatusCode::NOT_FOUND,
746                "ReplicationGroupNotFoundFault",
747                format!("ReplicationGroup {id} not found."),
748            )
749        })?;
750        let region = state.region.clone();
751        let xml = replication_group_xml(group, &region);
752        state.migrations.insert(
753            id.clone(),
754            crate::state::Migration {
755                replication_group_id: id,
756                customer_node_endpoint_address: endpoint_addr,
757                customer_node_endpoint_port: endpoint_port,
758                status: status.to_string(),
759                started_at: chrono::Utc::now().to_rfc3339(),
760            },
761        );
762        Ok(AwsResponse::xml(
763            StatusCode::OK,
764            query_response_xml(
765                action,
766                ELASTICACHE_NS,
767                &format!("<ReplicationGroup>{xml}</ReplicationGroup>"),
768                &request.request_id,
769            ),
770        ))
771    }
772
773    /// Best-effort parameter application: run `CONFIG SET` for every
774    /// user-modified parameter on every Redis/Valkey cluster and
775    /// replication group that uses the given parameter group.
776    async fn apply_parameters_for_group(&self, account_id: &str, param_group_name: &str) {
777        let Some(runtime) = self.runtime.as_ref() else {
778            return;
779        };
780        let (target_ids, params) = {
781            let accounts = self.state.read();
782            let state = accounts.get(account_id);
783            let Some(state) = state else { return };
784            let mut target_ids = Vec::new();
785            for c in state.cache_clusters.values() {
786                if c.cache_parameter_group_name.as_deref() == Some(param_group_name)
787                    && (c.engine == ENGINE_REDIS || c.engine == ENGINE_VALKEY)
788                {
789                    target_ids.push(c.cache_cluster_id.clone());
790                }
791            }
792            for g in state.replication_groups.values() {
793                if g.cache_parameter_group_name.as_deref() == Some(param_group_name)
794                    && (g.engine == ENGINE_REDIS || g.engine == ENGINE_VALKEY)
795                {
796                    target_ids.push(g.replication_group_id.clone());
797                }
798            }
799            let params = state
800                .parameter_group_parameters
801                .get(param_group_name)
802                .cloned()
803                .unwrap_or_default();
804            (target_ids, params)
805        };
806        for id in target_ids {
807            for param in &params {
808                if !param.is_modifiable {
809                    continue;
810                }
811                let args = vec![
812                    "CONFIG".to_string(),
813                    "SET".to_string(),
814                    param.parameter_name.clone(),
815                    param.parameter_value.clone(),
816                ];
817                match runtime.exec_redis(&id, &args).await {
818                    Ok(output) if !output.success => {
819                        tracing::warn!(
820                            resource_id = %id,
821                            param = %param.parameter_name,
822                            stderr = %String::from_utf8_lossy(&output.stderr),
823                            "CONFIG SET failed"
824                        );
825                    }
826                    Err(e) => {
827                        tracing::warn!(
828                            resource_id = %id,
829                            param = %param.parameter_name,
830                            %e,
831                            "CONFIG SET exec failed"
832                        );
833                    }
834                    _ => {}
835                }
836            }
837        }
838    }
839}
840
841// Helpers
842
843#[path = "../helpers.rs"]
844mod helpers;
845use helpers::*;
846
847#[cfg(test)]
848#[path = "../service_tests.rs"]
849mod tests;