Skip to main content

fakecloud_elasticache/runtime/
mod.rs

1//! Backing-container runtime for ElastiCache.
2//!
3//! ElastiCache cache clusters / replication groups / serverless caches
4//! are backed by a real `redis` or `memcached` process. That process can
5//! run either as a local Docker/Podman container (the default) or as a
6//! native Kubernetes Pod (`FAKECLOUD_ELASTICACHE_BACKEND=k8s` or the
7//! global `FAKECLOUD_CONTAINER_BACKEND=k8s`). The [`ElastiCacheRuntime`]
8//! dispatches every operation to the selected [`CacheBackend`]; the
9//! shared k8s plumbing lives in the `fakecloud-k8s` crate.
10
11mod k8s;
12
13use std::collections::{BTreeMap, HashMap};
14use std::sync::Arc;
15use std::time::Duration;
16
17use parking_lot::RwLock;
18
19pub use k8s::PendingRdb;
20
21/// Which cache engine a resource runs.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum CacheEngineKind {
24    Redis,
25    Memcached,
26}
27
28impl CacheEngineKind {
29    /// Container image for this engine.
30    fn image(self) -> &'static str {
31        match self {
32            CacheEngineKind::Redis => "redis:7-alpine",
33            CacheEngineKind::Memcached => "memcached:1.6-alpine",
34        }
35    }
36
37    /// Default port the engine listens on.
38    fn port(self) -> u16 {
39        match self {
40            CacheEngineKind::Redis => 6379,
41            CacheEngineKind::Memcached => 11211,
42        }
43    }
44}
45
46/// A running cache backing instance (container or Pod).
47#[derive(Debug, Clone)]
48pub struct RunningCacheContainer {
49    /// Backend-specific handle: a Docker container id, or a Pod name.
50    pub container_id: String,
51    /// The host port the engine is published on (Docker), or the engine's
52    /// in-Pod port (k8s). Persisted in resource state.
53    pub host_port: u16,
54    /// Address clients connect to: `127.0.0.1` for Docker (published port
55    /// on the host), or the Pod IP for k8s.
56    pub endpoint_address: String,
57    /// Port clients connect to: the published host port for Docker, the
58    /// engine's standard port for k8s.
59    pub endpoint_port: u16,
60    /// Which engine this is — used by the k8s backend to respawn on
61    /// reboot.
62    pub engine: CacheEngineKind,
63}
64
65/// Outcome of a `redis-cli` invocation, normalized across backends so
66/// callers don't depend on `std::process::Output`.
67#[derive(Debug, Clone)]
68pub struct CacheExec {
69    /// Whether the command exited 0.
70    pub success: bool,
71    /// Raw stdout bytes.
72    pub stdout: Vec<u8>,
73    /// Raw stderr bytes.
74    pub stderr: Vec<u8>,
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum RuntimeError {
79    #[error("container runtime is unavailable")]
80    Unavailable,
81    #[error("container failed to start: {0}")]
82    ContainerStartFailed(String),
83}
84
85/// Error initializing the Kubernetes backend at startup. Surfaced to the
86/// operator so a misconfigured cluster fails fast rather than silently
87/// falling back to Docker.
88#[derive(Debug, thiserror::Error)]
89pub enum BackendInitError {
90    #[error(transparent)]
91    Env(#[from] fakecloud_k8s::K8sEnvError),
92    #[error(transparent)]
93    PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
94    #[error("failed to connect to the Kubernetes cluster: {0}")]
95    Connect(String),
96}
97
98/// The selected backing-container backend.
99#[derive(Debug, Clone)]
100enum CacheBackend {
101    Docker(DockerCache),
102    K8s(k8s::K8sCache),
103}
104
105#[derive(Debug, Clone)]
106pub struct ElastiCacheRuntime {
107    backend: CacheBackend,
108    containers: Arc<RwLock<HashMap<String, RunningCacheContainer>>>,
109}
110
111impl ElastiCacheRuntime {
112    /// Construct the Docker/Podman backend. Returns `None` when no
113    /// container CLI is available.
114    pub fn new() -> Option<Self> {
115        let cli = fakecloud_core::container_net::detect_container_cli()?;
116        let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
117        Some(Self {
118            backend: CacheBackend::Docker(DockerCache {
119                cli,
120                net,
121                instance_id: format!("fakecloud-{}", std::process::id()),
122            }),
123            containers: Arc::new(RwLock::new(HashMap::new())),
124        })
125    }
126
127    /// Construct the Kubernetes backend. `server_port` is fakecloud's
128    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one);
129    /// `internal_token` guards the per-resource RDB endpoint that seeds
130    /// snapshot data into restored Redis Pods. Fails fast on
131    /// misconfiguration — never silently degrades.
132    pub async fn new_k8s(
133        server_port: u16,
134        internal_token: String,
135    ) -> Result<Self, BackendInitError> {
136        let cache = k8s::K8sCache::from_env(server_port, internal_token).await?;
137        Ok(Self {
138            backend: CacheBackend::K8s(cache),
139            containers: Arc::new(RwLock::new(HashMap::new())),
140        })
141    }
142
143    /// Name of the active backend, for logging.
144    pub fn cli_name(&self) -> &str {
145        match &self.backend {
146            CacheBackend::Docker(d) => &d.cli,
147            CacheBackend::K8s(_) => "kubernetes",
148        }
149    }
150
151    /// The pending-RDB map the server's internal endpoint serves from.
152    /// `None` on the Docker backend (which stages snapshots via the
153    /// daemon, not HTTP).
154    pub fn pending_rdb(&self) -> Option<PendingRdb> {
155        match &self.backend {
156            CacheBackend::K8s(k) => Some(k.pending_rdb()),
157            CacheBackend::Docker(_) => None,
158        }
159    }
160
161    /// Address fakecloud advertises for clients to reach a spawned cache
162    /// container, and uses for readiness probes. `127.0.0.1` on the host;
163    /// `host.docker.internal` when fakecloud is containerized (issue
164    /// #1539, bug 0.4). Only meaningful for the Docker backend (k8s
165    /// addresses are per-Pod and returned from `ensure_*`).
166    pub fn endpoint_host(&self) -> &str {
167        match &self.backend {
168            CacheBackend::Docker(d) => &d.net.sibling_host,
169            CacheBackend::K8s(_) => "127.0.0.1",
170        }
171    }
172
173    pub async fn ensure_redis(
174        &self,
175        resource_id: &str,
176        rdb_path: Option<&str>,
177        tags: &BTreeMap<String, String>,
178    ) -> Result<RunningCacheContainer, RuntimeError> {
179        let running = match &self.backend {
180            CacheBackend::Docker(d) => {
181                d.spawn_container(resource_id, CacheEngineKind::Redis, rdb_path)
182                    .await?
183            }
184            CacheBackend::K8s(k) => {
185                k.spawn_pod(resource_id, CacheEngineKind::Redis, rdb_path, tags)
186                    .await?
187            }
188        };
189        self.containers
190            .write()
191            .insert(resource_id.to_string(), running.clone());
192        Ok(running)
193    }
194
195    pub async fn ensure_memcached(
196        &self,
197        resource_id: &str,
198        tags: &BTreeMap<String, String>,
199    ) -> Result<RunningCacheContainer, RuntimeError> {
200        let running = match &self.backend {
201            CacheBackend::Docker(d) => {
202                d.spawn_container(resource_id, CacheEngineKind::Memcached, None)
203                    .await?
204            }
205            CacheBackend::K8s(k) => {
206                k.spawn_pod(resource_id, CacheEngineKind::Memcached, None, tags)
207                    .await?
208            }
209        };
210        self.containers
211            .write()
212            .insert(resource_id.to_string(), running.clone());
213        Ok(running)
214    }
215
216    pub async fn stop_container(&self, resource_id: &str) {
217        let container = self.containers.write().remove(resource_id);
218        if let Some(container) = container {
219            match &self.backend {
220                CacheBackend::Docker(d) => d.remove_container(&container.container_id).await,
221                CacheBackend::K8s(k) => k.delete_pod(&container.container_id).await,
222            }
223        }
224    }
225
226    /// Remove the persisted redis/valkey data volume for a resource. Called on
227    /// delete so a later resource reusing the identifier starts clean instead
228    /// of reloading the deleted resource's RDB. No-op on the k8s backend (PVC
229    /// lifecycle is handled there) and for memcached (no volume).
230    pub async fn remove_data_volume(&self, resource_id: &str) {
231        if let CacheBackend::Docker(d) = &self.backend {
232            d.remove_data_volume(resource_id).await;
233        }
234    }
235
236    /// Restart the underlying backing instance, mirroring real
237    /// ElastiCache's RebootCacheCluster behaviour. Returns `Unavailable`
238    /// if the resource has no live instance tracked here.
239    pub async fn restart_container(
240        &self,
241        resource_id: &str,
242        tags: &BTreeMap<String, String>,
243    ) -> Result<(), RuntimeError> {
244        let running = {
245            let containers = self.containers.read();
246            containers.get(resource_id).cloned()
247        };
248        let running = running.ok_or(RuntimeError::Unavailable)?;
249        match &self.backend {
250            CacheBackend::Docker(d) => d.restart_container(&running.container_id).await,
251            CacheBackend::K8s(k) => {
252                // A Pod can't be restarted in place; recreate it,
253                // preserving Redis data by snapshotting it across the
254                // recreate. The new Pod keeps the same deterministic name.
255                // Per-instance scheduling tags are re-applied to the fresh
256                // Pod so a reboot keeps the resource's node placement.
257                let updated = k.reboot_pod(resource_id, &running, tags).await?;
258                self.containers
259                    .write()
260                    .insert(resource_id.to_string(), updated);
261                Ok(())
262            }
263        }
264    }
265
266    /// Execute a `redis-cli` command inside a tracked instance.
267    pub async fn exec_redis(
268        &self,
269        resource_id: &str,
270        redis_args: &[String],
271    ) -> Result<CacheExec, RuntimeError> {
272        let container_id = {
273            let containers = self.containers.read();
274            containers
275                .get(resource_id)
276                .map(|c| c.container_id.clone())
277                .ok_or(RuntimeError::Unavailable)?
278        };
279        match &self.backend {
280            CacheBackend::Docker(d) => d.exec_redis(&container_id, redis_args).await,
281            CacheBackend::K8s(k) => k.exec_redis(&container_id, redis_args).await,
282        }
283    }
284
285    /// Trigger `SAVE` inside a running Redis instance and copy the
286    /// resulting `dump.rdb` out to `dest_path`.
287    pub async fn dump_rdb(&self, resource_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
288        let container_id = {
289            let containers = self.containers.read();
290            containers
291                .get(resource_id)
292                .map(|c| c.container_id.clone())
293                .ok_or(RuntimeError::Unavailable)?
294        };
295        match &self.backend {
296            CacheBackend::Docker(d) => d.dump_rdb(&container_id, dest_path).await,
297            CacheBackend::K8s(k) => k.dump_rdb(&container_id, dest_path).await,
298        }
299    }
300
301    pub async fn stop_all(&self) {
302        let containers: Vec<RunningCacheContainer> = {
303            let mut containers = self.containers.write();
304            containers.drain().map(|(_, c)| c).collect()
305        };
306        for c in containers {
307            match &self.backend {
308                CacheBackend::Docker(d) => d.remove_container(&c.container_id).await,
309                CacheBackend::K8s(k) => k.delete_pod(&c.container_id).await,
310            }
311        }
312    }
313
314    /// Sweep cache Pods orphaned by a previous fakecloud process (k8s
315    /// only; the Docker backend relies on the shared reaper).
316    pub async fn reap_stale(&self) {
317        if let CacheBackend::K8s(k) = &self.backend {
318            k.reap_stale().await;
319        }
320    }
321}
322
323/// Docker/Podman backend: shells out to the container CLI, exactly as
324/// ElastiCache always has.
325#[derive(Debug, Clone)]
326struct DockerCache {
327    cli: String,
328    net: fakecloud_core::container_net::HostNetworking,
329    instance_id: String,
330}
331
332impl DockerCache {
333    async fn spawn_container(
334        &self,
335        resource_id: &str,
336        engine: CacheEngineKind,
337        rdb_path: Option<&str>,
338    ) -> Result<RunningCacheContainer, RuntimeError> {
339        let image = engine.image();
340        let container_port = engine.port();
341
342        let mut args: Vec<String> = vec![
343            "create".to_string(),
344            "-p".to_string(),
345            format!(":{container_port}"),
346            "--label".to_string(),
347            format!("fakecloud-elasticache={resource_id}"),
348            "--label".to_string(),
349            format!("fakecloud-instance={}", self.instance_id),
350        ];
351
352        // Persist redis/valkey data in a named volume at /data so a container
353        // recreated after a fakecloud restart reloads its RDB instead of
354        // coming back empty (bug-audit 2026-06-20, 4.2). A named volume (not a
355        // bind mount) is daemon-managed, so it works whether or not fakecloud
356        // is itself containerized. memcached is intentionally in-memory only,
357        // matching real ElastiCache (a reboot clears it), so it gets no
358        // volume.
359        if matches!(engine, CacheEngineKind::Redis) {
360            args.push("-v".to_string());
361            args.push(format!("{}:/data", data_volume_name(resource_id)));
362        }
363        args.push(image.to_string());
364
365        let output = tokio::process::Command::new(&self.cli)
366            .args(&args)
367            .output()
368            .await
369            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
370
371        if !output.status.success() {
372            return Err(RuntimeError::ContainerStartFailed(
373                String::from_utf8_lossy(&output.stderr).trim().to_string(),
374            ));
375        }
376
377        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
378
379        // Stage the snapshot RDB into the created (not yet started)
380        // container via `docker cp` rather than a `-v` bind mount. A bind
381        // mount of a host path breaks when fakecloud runs in a container
382        // (`FAKECLOUD_IN_CONTAINER=1`): the rdb is written inside
383        // fakecloud's own filesystem, but the host daemon resolves the bind
384        // source against the *host* filesystem, silently yielding an empty
385        // cache. `docker cp` copies the bytes across the daemon, so it works
386        // on host and in-container alike (issue #1539, bug 0.7). Redis loads
387        // /data/dump.rdb at startup, so the copy must precede `start`.
388        if let Some(path) = rdb_path {
389            let cp_result = tokio::process::Command::new(&self.cli)
390                .args(["cp", path, &format!("{container_id}:/data/dump.rdb")])
391                .output()
392                .await
393                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
394            if !cp_result.status.success() {
395                self.remove_container(&container_id).await;
396                return Err(RuntimeError::ContainerStartFailed(format!(
397                    "failed to stage snapshot rdb into container: {}",
398                    String::from_utf8_lossy(&cp_result.stderr).trim()
399                )));
400            }
401        }
402
403        let start_result = tokio::process::Command::new(&self.cli)
404            .args(["start", &container_id])
405            .output()
406            .await
407            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
408
409        if !start_result.status.success() {
410            self.remove_container(&container_id).await;
411            return Err(RuntimeError::ContainerStartFailed(format!(
412                "container start failed: {}",
413                String::from_utf8_lossy(&start_result.stderr).trim()
414            )));
415        }
416
417        let host_port = match self.lookup_port(&container_id, container_port).await {
418            Ok(host_port) => host_port,
419            Err(error) => {
420                self.remove_container(&container_id).await;
421                return Err(error);
422            }
423        };
424
425        let wait_result = match engine {
426            CacheEngineKind::Redis => self.wait_for_redis(host_port).await,
427            CacheEngineKind::Memcached => self.wait_for_memcached(host_port).await,
428        };
429        if let Err(error) = wait_result {
430            self.remove_container(&container_id).await;
431            return Err(error);
432        }
433
434        Ok(RunningCacheContainer {
435            container_id,
436            host_port,
437            // sibling_host is 127.0.0.1 on the host (CI, unit tests) and
438            // host.docker.internal when fakecloud itself is containerized
439            // (issue #1539) — the address a client actually reaches the
440            // published port at.
441            endpoint_address: self.net.sibling_host.clone(),
442            endpoint_port: host_port,
443            engine,
444        })
445    }
446
447    async fn restart_container(&self, container_id: &str) -> Result<(), RuntimeError> {
448        let output = tokio::process::Command::new(&self.cli)
449            .args(["restart", container_id])
450            .output()
451            .await
452            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
453        if !output.status.success() {
454            return Err(RuntimeError::ContainerStartFailed(
455                String::from_utf8_lossy(&output.stderr).trim().to_string(),
456            ));
457        }
458        Ok(())
459    }
460
461    async fn exec_redis(
462        &self,
463        container_id: &str,
464        redis_args: &[String],
465    ) -> Result<CacheExec, RuntimeError> {
466        let mut args = vec![
467            "exec".to_string(),
468            container_id.to_string(),
469            "redis-cli".to_string(),
470        ];
471        args.extend_from_slice(redis_args);
472        let out = tokio::process::Command::new(&self.cli)
473            .args(&args)
474            .output()
475            .await
476            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
477        Ok(CacheExec {
478            success: out.status.success(),
479            stdout: out.stdout,
480            stderr: out.stderr,
481        })
482    }
483
484    async fn dump_rdb(&self, container_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
485        let save_output = tokio::process::Command::new(&self.cli)
486            .args(["exec", container_id, "redis-cli", "SAVE"])
487            .output()
488            .await
489            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
490        if !save_output.status.success() {
491            return Err(RuntimeError::ContainerStartFailed(
492                String::from_utf8_lossy(&save_output.stderr)
493                    .trim()
494                    .to_string(),
495            ));
496        }
497
498        let cp_output = tokio::process::Command::new(&self.cli)
499            .args(["cp", &format!("{container_id}:/data/dump.rdb"), dest_path])
500            .output()
501            .await
502            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
503        if !cp_output.status.success() {
504            return Err(RuntimeError::ContainerStartFailed(
505                String::from_utf8_lossy(&cp_output.stderr)
506                    .trim()
507                    .to_string(),
508            ));
509        }
510        Ok(())
511    }
512
513    async fn lookup_port(
514        &self,
515        container_id: &str,
516        container_port: u16,
517    ) -> Result<u16, RuntimeError> {
518        let port_output = tokio::process::Command::new(&self.cli)
519            .args(["port", container_id, &container_port.to_string()])
520            .output()
521            .await
522            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
523
524        if !port_output.status.success() {
525            let stderr = String::from_utf8_lossy(&port_output.stderr);
526            return Err(RuntimeError::ContainerStartFailed(format!(
527                "port lookup failed: {stderr}"
528            )));
529        }
530
531        let port_str = String::from_utf8_lossy(&port_output.stdout);
532        port_str
533            .trim()
534            .rsplit(':')
535            .next()
536            .and_then(|value| value.parse::<u16>().ok())
537            .ok_or_else(|| {
538                RuntimeError::ContainerStartFailed(format!(
539                    "could not determine redis port from '{}'",
540                    port_str.trim()
541                ))
542            })
543    }
544
545    async fn wait_for_redis(&self, host_port: u16) -> Result<(), RuntimeError> {
546        // Probe the same address clients reach the published port at:
547        // 127.0.0.1 on the host, host.docker.internal /
548        // host.containers.internal when fakecloud is containerized (#1539).
549        let host = &self.net.sibling_host;
550        for _ in 0..40 {
551            tokio::time::sleep(Duration::from_millis(500)).await;
552            if tokio::net::TcpStream::connect(format!("{host}:{host_port}"))
553                .await
554                .is_ok()
555            {
556                return Ok(());
557            }
558        }
559
560        Err(RuntimeError::ContainerStartFailed(
561            "redis container did not become ready within 20 seconds".to_string(),
562        ))
563    }
564
565    async fn wait_for_memcached(&self, host_port: u16) -> Result<(), RuntimeError> {
566        use tokio::io::{AsyncReadExt, AsyncWriteExt};
567        let host = &self.net.sibling_host;
568        for _ in 0..40 {
569            tokio::time::sleep(Duration::from_millis(500)).await;
570            let Ok(mut stream) =
571                tokio::net::TcpStream::connect(format!("{host}:{host_port}")).await
572            else {
573                continue;
574            };
575            if stream.write_all(b"version\r\n").await.is_err() {
576                continue;
577            }
578            let mut buf = [0u8; 32];
579            match tokio::time::timeout(Duration::from_secs(2), stream.read(&mut buf)).await {
580                Ok(Ok(n)) if n > 0 && buf.starts_with(b"VERSION") => return Ok(()),
581                _ => continue,
582            }
583        }
584
585        Err(RuntimeError::ContainerStartFailed(
586            "memcached container did not become ready within 20 seconds".to_string(),
587        ))
588    }
589
590    async fn remove_container(&self, container_id: &str) {
591        let _ = tokio::process::Command::new(&self.cli)
592            .args(["rm", "-f", container_id])
593            .output()
594            .await;
595    }
596
597    async fn remove_data_volume(&self, resource_id: &str) {
598        let _ = tokio::process::Command::new(&self.cli)
599            .args(["volume", "rm", "-f", &data_volume_name(resource_id)])
600            .output()
601            .await;
602    }
603}
604
605/// Deterministic Docker volume name for a resource's redis data dir. Keyed
606/// only on the resource id (NOT the per-process fakecloud instance id) so the
607/// same volume reattaches after a fakecloud restart. Characters outside
608/// Docker's `[a-zA-Z0-9_.-]` volume-name set are replaced with `-`.
609fn data_volume_name(resource_id: &str) -> String {
610    let sanitized: String = resource_id
611        .chars()
612        .map(|c| {
613            if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
614                c
615            } else {
616                '-'
617            }
618        })
619        .collect();
620    format!("fakecloud-elasticache-data-{sanitized}")
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    fn data_volume_name_is_stable_and_sanitized() {
629        // Stable across calls (so recovery reattaches the same volume) and
630        // keyed only on the resource id, not the per-process instance id.
631        assert_eq!(
632            data_volume_name("my-cache"),
633            "fakecloud-elasticache-data-my-cache"
634        );
635        assert_eq!(
636            data_volume_name("weird/id:1"),
637            "fakecloud-elasticache-data-weird-id-1"
638        );
639    }
640}