Skip to main content

fakecloud_elasticache/runtime/
k8s.rs

1//! Kubernetes backend for ElastiCache backing instances.
2//!
3//! Runs each cache as a native Pod (one `redis`/`memcached` container)
4//! instead of a Docker container. Shared client/lifecycle/exec/reaping
5//! plumbing comes from the `fakecloud-k8s` crate; this module only builds
6//! the cache Pod spec and maps the ElastiCache runtime operations onto
7//! Pod operations.
8//!
9//! Snapshot restore: a restored Redis Pod's container `wget`s the
10//! snapshot RDB from a per-process, bearer-token-guarded fakecloud
11//! endpoint into `/data/dump.rdb` before `exec`ing `redis-server`, so the
12//! engine loads it at startup — the k8s analogue of the Docker path's
13//! `docker cp` into the created container.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use k8s_openapi::api::core::v1::{
20    Container, ContainerPort, EnvVar, LocalObjectReference, Pod, PodSpec,
21};
22use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
23use parking_lot::RwLock;
24
25use fakecloud_k8s::{labels, names, K8sClient, K8sEnv, K8sPodConfig};
26
27use super::{BackendInitError, CacheEngineKind, CacheExec, RunningCacheContainer, RuntimeError};
28
29/// Which `fakecloud-service` label cache Pods carry.
30const SERVICE: &str = "elasticache";
31/// Pod-name prefix for cache Pods (kept short so the resource slug + hash
32/// suffix fit the 63-char DNS label limit).
33const POD_PREFIX: &str = "fakecloud-ec";
34/// Container name inside each cache Pod.
35const CONTAINER: &str = "cache";
36
37/// Snapshot RDB payloads staged for in-flight restores, keyed by Pod
38/// name. The server's internal endpoint serves bytes from here; the
39/// restored Pod's container fetches them at startup. Shared (cloneable
40/// `Arc`) so the server route and the runtime see the same map.
41pub type PendingRdb = Arc<RwLock<HashMap<String, Vec<u8>>>>;
42
43#[derive(Clone)]
44pub(super) struct K8sCache {
45    client: K8sClient,
46    /// In-cluster fakecloud base URL, e.g.
47    /// `http://fakecloud.fakecloud.svc.cluster.local:4566`.
48    self_url: String,
49    /// Bearer token the restore fetch presents to the RDB endpoint.
50    internal_token: String,
51    /// Optional `imagePullSecrets` name for private registries.
52    pull_secret: Option<String>,
53    /// Global + ElastiCache-service node selector / tolerations /
54    /// annotations applied to every cache Pod.
55    pod_config: K8sPodConfig,
56    pending_rdb: PendingRdb,
57}
58
59impl std::fmt::Debug for K8sCache {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("K8sCache")
62            .field("namespace", &self.client.namespace())
63            .field("self_url", &self.self_url)
64            .finish_non_exhaustive()
65    }
66}
67
68impl K8sCache {
69    pub(super) async fn from_env(
70        server_port: u16,
71        internal_token: String,
72    ) -> Result<Self, BackendInitError> {
73        let env = K8sEnv::from_env(server_port)?;
74        let pod_config = K8sPodConfig::resolved_base("FAKECLOUD_ELASTICACHE_K8S")?;
75        let client = K8sClient::connect(env.namespace.clone())
76            .await
77            .map_err(|e| BackendInitError::Connect(e.to_string()))?;
78        tracing::info!(
79            namespace = %env.namespace,
80            self_url = %env.self_url,
81            "K8s ElastiCache backend initialized"
82        );
83        Ok(Self {
84            client,
85            self_url: env.self_url,
86            internal_token,
87            pull_secret: env.pull_secret,
88            pod_config,
89            pending_rdb: Arc::new(RwLock::new(HashMap::new())),
90        })
91    }
92
93    pub(super) fn pending_rdb(&self) -> PendingRdb {
94        self.pending_rdb.clone()
95    }
96
97    /// Spawn a cache Pod, reading the snapshot RDB from `rdb_path` (if
98    /// any) into memory first.
99    pub(super) async fn spawn_pod(
100        &self,
101        resource_id: &str,
102        engine: CacheEngineKind,
103        rdb_path: Option<&str>,
104        tags: &std::collections::BTreeMap<String, String>,
105    ) -> Result<RunningCacheContainer, RuntimeError> {
106        let rdb = match rdb_path {
107            Some(path) => Some(tokio::fs::read(path).await.map_err(|e| {
108                RuntimeError::ContainerStartFailed(format!("reading snapshot rdb {path}: {e}"))
109            })?),
110            None => None,
111        };
112        self.spawn_pod_bytes(resource_id, engine, rdb, tags).await
113    }
114
115    async fn spawn_pod_bytes(
116        &self,
117        resource_id: &str,
118        engine: CacheEngineKind,
119        rdb: Option<Vec<u8>>,
120        tags: &std::collections::BTreeMap<String, String>,
121    ) -> Result<RunningCacheContainer, RuntimeError> {
122        let pod_name = names::pod_name(POD_PREFIX, resource_id, resource_id);
123        let port = engine.port();
124
125        // Stage snapshot bytes (Redis only) for the Pod to fetch, then
126        // build the spec referencing the RDB endpoint.
127        let rdb_url = if matches!(engine, CacheEngineKind::Redis) {
128            if let Some(bytes) = rdb {
129                self.pending_rdb.write().insert(pod_name.clone(), bytes);
130                Some(format!(
131                    "{}/_fakecloud/elasticache/_internal/rdb/{}",
132                    self.self_url.trim_end_matches('/'),
133                    pod_name
134                ))
135            } else {
136                None
137            }
138        } else {
139            None
140        };
141
142        let mut pod = build_cache_pod(CachePodContext {
143            pod_name: &pod_name,
144            namespace: self.client.namespace(),
145            instance_id: self.client.instance_id(),
146            resource_id,
147            image: engine.image(),
148            port,
149            rdb_url: rdb_url.as_deref(),
150            internal_token: &self.internal_token,
151            pull_secret: self.pull_secret.as_deref(),
152        });
153        // Operator-configured global + ElastiCache-service base, with this
154        // resource's reserved `fakecloud-k8s/*` tag overrides merged over it
155        // (per-instance wins). Every spawn path (create, serverless,
156        // replication, recovery, reboot) funnels through here, so this
157        // covers them all.
158        self.pod_config
159            .clone()
160            .merge(K8sPodConfig::from_tags(tags))
161            .apply(&mut pod);
162
163        let result = self.launch(&pod, &pod_name, port, engine).await;
164        // Whatever happened, the staged RDB is no longer needed: a
165        // successful Pod already fetched it; a failed one won't retry.
166        self.pending_rdb.write().remove(&pod_name);
167        result
168    }
169
170    async fn launch(
171        &self,
172        pod: &Pod,
173        pod_name: &str,
174        port: u16,
175        engine: CacheEngineKind,
176    ) -> Result<RunningCacheContainer, RuntimeError> {
177        self.client
178            .create_pod(pod)
179            .await
180            .map_err(|e| RuntimeError::ContainerStartFailed(format!("create cache pod: {e}")))?;
181
182        let pod_ip = match self
183            .client
184            .wait_for_pod_ip(pod_name, Duration::from_secs(90))
185            .await
186        {
187            Ok(ip) => ip,
188            Err(e) => {
189                self.client.delete_pod(pod_name).await;
190                return Err(RuntimeError::ContainerStartFailed(e.to_string()));
191            }
192        };
193        if let Err(e) = K8sClient::wait_for_tcp(&pod_ip, port, Duration::from_secs(30)).await {
194            self.client.delete_pod(pod_name).await;
195            return Err(RuntimeError::ContainerStartFailed(format!(
196                "cache pod {pod_name} ({pod_ip}:{port}) not ready: {e}"
197            )));
198        }
199
200        Ok(RunningCacheContainer {
201            container_id: pod_name.to_string(),
202            host_port: port,
203            endpoint_address: pod_ip,
204            endpoint_port: port,
205            engine,
206        })
207    }
208
209    pub(super) async fn delete_pod(&self, pod_name: &str) {
210        self.client.delete_pod(pod_name).await;
211    }
212
213    pub(super) async fn exec_redis(
214        &self,
215        pod_name: &str,
216        redis_args: &[String],
217    ) -> Result<CacheExec, RuntimeError> {
218        let mut cmd: Vec<&str> = vec!["redis-cli"];
219        cmd.extend(redis_args.iter().map(String::as_str));
220        let out = self
221            .client
222            .exec(pod_name, Some(CONTAINER), &cmd)
223            .await
224            .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec redis-cli: {e}")))?;
225        Ok(CacheExec {
226            success: out.success(),
227            stdout: out.stdout,
228            stderr: out.stderr.into_bytes(),
229        })
230    }
231
232    pub(super) async fn dump_rdb(
233        &self,
234        pod_name: &str,
235        dest_path: &str,
236    ) -> Result<(), RuntimeError> {
237        let save = self
238            .client
239            .exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
240            .await
241            .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec SAVE: {e}")))?;
242        if !save.success() {
243            return Err(RuntimeError::ContainerStartFailed(format!(
244                "redis SAVE failed: {}",
245                save.stderr.trim()
246            )));
247        }
248        let cat = self
249            .client
250            .exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
251            .await
252            .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec cat rdb: {e}")))?;
253        if !cat.success() {
254            return Err(RuntimeError::ContainerStartFailed(format!(
255                "reading dump.rdb from pod failed: {}",
256                cat.stderr.trim()
257            )));
258        }
259        tokio::fs::write(dest_path, &cat.stdout)
260            .await
261            .map_err(|e| RuntimeError::ContainerStartFailed(format!("writing {dest_path}: {e}")))?;
262        Ok(())
263    }
264
265    /// Reboot = recreate the Pod. A Pod can't be restarted in place, so
266    /// for Redis we snapshot the live data and reload it into the fresh
267    /// Pod, preserving the dataset across the reboot (matching the Docker
268    /// backend's in-place `docker restart`). Memcached has no
269    /// persistence, so its reboot is a plain recreate (a flush — which is
270    /// also what a real memcached reboot does).
271    pub(super) async fn reboot_pod(
272        &self,
273        resource_id: &str,
274        running: &RunningCacheContainer,
275        tags: &std::collections::BTreeMap<String, String>,
276    ) -> Result<RunningCacheContainer, RuntimeError> {
277        let preserved = if matches!(running.engine, CacheEngineKind::Redis) {
278            self.snapshot_live_rdb(&running.container_id).await
279        } else {
280            None
281        };
282        self.client.delete_pod(&running.container_id).await;
283        self.spawn_pod_bytes(resource_id, running.engine, preserved, tags)
284            .await
285    }
286
287    /// Best-effort `SAVE` + read of the current `dump.rdb` out of a Pod,
288    /// returning the bytes. Returns `None` (start empty) if the snapshot
289    /// can't be captured rather than failing the reboot.
290    async fn snapshot_live_rdb(&self, pod_name: &str) -> Option<Vec<u8>> {
291        let _ = self
292            .client
293            .exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
294            .await;
295        let cat = self
296            .client
297            .exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
298            .await
299            .ok()?;
300        if cat.success() && !cat.stdout.is_empty() {
301            Some(cat.stdout)
302        } else {
303            None
304        }
305    }
306
307    pub(super) async fn reap_stale(&self) {
308        self.client.reap_stale(SERVICE).await;
309    }
310}
311
312/// Inputs for [`build_cache_pod`].
313struct CachePodContext<'a> {
314    pod_name: &'a str,
315    namespace: &'a str,
316    instance_id: &'a str,
317    resource_id: &'a str,
318    image: &'a str,
319    port: u16,
320    /// When set (Redis restore), the container fetches this URL into
321    /// `/data/dump.rdb` before starting `redis-server`.
322    rdb_url: Option<&'a str>,
323    internal_token: &'a str,
324    pull_secret: Option<&'a str>,
325}
326
327/// Build the Pod spec for one cache backing instance. Pure — no cluster
328/// I/O — so it's unit-testable.
329fn build_cache_pod(ctx: CachePodContext<'_>) -> Pod {
330    let mut pod_labels = std::collections::BTreeMap::new();
331    pod_labels.insert(
332        labels::MANAGED_BY.to_string(),
333        labels::MANAGED_BY_VALUE.to_string(),
334    );
335    pod_labels.insert(labels::INSTANCE.to_string(), ctx.instance_id.to_string());
336    pod_labels.insert(labels::SERVICE.to_string(), SERVICE.to_string());
337    pod_labels.insert(
338        "fakecloud-elasticache".to_string(),
339        names::label_safe(ctx.resource_id),
340    );
341
342    // When restoring a snapshot, override the command to fetch the RDB
343    // into /data before launching redis-server. Otherwise use the image's
344    // default entrypoint.
345    let (command, env) = match ctx.rdb_url {
346        Some(url) => {
347            let script = "set -e; \
348                 wget -q --header=\"authorization: Bearer $FAKECLOUD_RDB_TOKEN\" \
349                      -O /data/dump.rdb \"$FAKECLOUD_RDB_URL\"; \
350                 exec redis-server"
351                .to_string();
352            (
353                Some(vec!["sh".to_string(), "-c".to_string(), script]),
354                Some(vec![
355                    EnvVar {
356                        name: "FAKECLOUD_RDB_URL".to_string(),
357                        value: Some(url.to_string()),
358                        value_from: None,
359                    },
360                    EnvVar {
361                        name: "FAKECLOUD_RDB_TOKEN".to_string(),
362                        value: Some(ctx.internal_token.to_string()),
363                        value_from: None,
364                    },
365                ]),
366            )
367        }
368        None => (None, None),
369    };
370
371    let container = Container {
372        name: CONTAINER.to_string(),
373        image: Some(ctx.image.to_string()),
374        command,
375        env,
376        ports: Some(vec![ContainerPort {
377            container_port: ctx.port as i32,
378            ..ContainerPort::default()
379        }]),
380        ..Container::default()
381    };
382
383    let pull_secrets = ctx.pull_secret.map(|name| {
384        vec![LocalObjectReference {
385            name: name.to_string(),
386        }]
387    });
388
389    Pod {
390        metadata: ObjectMeta {
391            name: Some(ctx.pod_name.to_string()),
392            namespace: Some(ctx.namespace.to_string()),
393            labels: Some(pod_labels),
394            ..ObjectMeta::default()
395        },
396        spec: Some(PodSpec {
397            // We manage lifecycle explicitly (reboot recreates the Pod),
398            // so the kubelet shouldn't restart the container itself.
399            restart_policy: Some("Never".to_string()),
400            containers: vec![container],
401            image_pull_secrets: pull_secrets,
402            ..PodSpec::default()
403        }),
404        ..Pod::default()
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    fn ctx<'a>(rdb_url: Option<&'a str>) -> CachePodContext<'a> {
413        CachePodContext {
414            pod_name: "fakecloud-ec-mycache-abc123",
415            namespace: "fakecloud",
416            instance_id: "fakecloud-1234",
417            resource_id: "My_Cache",
418            image: "redis:7-alpine",
419            port: 6379,
420            rdb_url,
421            internal_token: "secret-token",
422            pull_secret: None,
423        }
424    }
425
426    #[test]
427    fn pod_has_ownership_labels() {
428        let pod = build_cache_pod(ctx(None));
429        let l = pod.metadata.labels.unwrap();
430        assert_eq!(l.get(labels::MANAGED_BY).unwrap(), labels::MANAGED_BY_VALUE);
431        assert_eq!(l.get(labels::SERVICE).unwrap(), "elasticache");
432        assert_eq!(l.get(labels::INSTANCE).unwrap(), "fakecloud-1234");
433        // resource id slug is DNS-safe (underscores -> dashes, lowercased).
434        assert_eq!(l.get("fakecloud-elasticache").unwrap(), "my-cache");
435    }
436
437    #[test]
438    fn container_exposes_engine_port_and_image() {
439        let pod = build_cache_pod(ctx(None));
440        let c = &pod.spec.unwrap().containers[0];
441        assert_eq!(c.image.as_deref(), Some("redis:7-alpine"));
442        assert_eq!(c.ports.as_ref().unwrap()[0].container_port, 6379);
443    }
444
445    #[test]
446    fn no_rdb_uses_default_entrypoint() {
447        let pod = build_cache_pod(ctx(None));
448        let c = &pod.spec.unwrap().containers[0];
449        assert!(c.command.is_none());
450        assert!(c.env.is_none());
451    }
452
453    #[test]
454    fn rdb_restore_overrides_command_and_sets_env() {
455        let pod = build_cache_pod(ctx(Some(
456            "http://fc:4566/_fakecloud/elasticache/_internal/rdb/p",
457        )));
458        let c = &pod.spec.unwrap().containers[0];
459        let script = c.command.as_ref().unwrap().last().unwrap();
460        assert!(script.contains("wget"), "should fetch rdb: {script}");
461        assert!(script.contains("/data/dump.rdb"));
462        assert!(script.contains("exec redis-server"));
463        // The token is referenced via env, never inlined into the script.
464        assert!(script.contains("$FAKECLOUD_RDB_TOKEN"));
465        assert!(!script.contains("secret-token"));
466        let env = c.env.as_ref().unwrap();
467        assert!(env.iter().any(|e| e.name == "FAKECLOUD_RDB_URL"));
468        assert!(
469            env.iter()
470                .any(|e| e.name == "FAKECLOUD_RDB_TOKEN"
471                    && e.value.as_deref() == Some("secret-token"))
472        );
473    }
474
475    #[test]
476    fn restart_policy_never() {
477        let pod = build_cache_pod(ctx(None));
478        assert_eq!(pod.spec.unwrap().restart_policy.as_deref(), Some("Never"));
479    }
480
481    #[test]
482    fn pull_secret_attached_when_set() {
483        let mut c = ctx(None);
484        c.pull_secret = Some("reg-secret");
485        let pod = build_cache_pod(c);
486        let secrets = pod.spec.unwrap().image_pull_secrets.unwrap();
487        assert_eq!(secrets[0].name, "reg-secret");
488    }
489
490    #[test]
491    fn pod_config_base_applies_to_built_pod() {
492        use std::collections::BTreeMap;
493        // The global + service env base (resolved at from_env) is applied
494        // to every cache Pod in spawn_pod_bytes; this asserts the apply
495        // contract over a built cache Pod.
496        let mut pod = build_cache_pod(ctx(None));
497        let cfg = K8sPodConfig {
498            node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
499            annotations: BTreeMap::from([("team".to_string(), "platform".to_string())]),
500            ..Default::default()
501        };
502        cfg.apply(&mut pod);
503        let spec = pod.spec.unwrap();
504        assert_eq!(
505            spec.node_selector.unwrap().get("pool").map(String::as_str),
506            Some("cache")
507        );
508        assert_eq!(
509            pod.metadata
510                .annotations
511                .unwrap()
512                .get("team")
513                .map(String::as_str),
514            Some("platform")
515        );
516    }
517
518    #[test]
519    fn pod_config_overrides_apply_to_built_pod() {
520        use std::collections::BTreeMap;
521        // Mirrors the spawn_pod_bytes wiring: ElastiCache-service base
522        // merged with the resource's reserved-tag overrides, applied to the
523        // built cache Pod.
524        let mut pod = build_cache_pod(ctx(None));
525        let base = K8sPodConfig {
526            node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
527            ..Default::default()
528        };
529        let tags = BTreeMap::from([
530            (
531                "fakecloud-k8s/node-selector".to_string(),
532                "pool=spot,disktype=ssd".to_string(),
533            ),
534            (
535                "fakecloud-k8s/annotations".to_string(),
536                "team=data".to_string(),
537            ),
538        ]);
539        base.merge(K8sPodConfig::from_tags(&tags)).apply(&mut pod);
540
541        let spec = pod.spec.unwrap();
542        let sel = spec.node_selector.unwrap();
543        // Per-instance tag overrides the base on `pool`; base-only keys survive.
544        assert_eq!(sel.get("pool").map(String::as_str), Some("spot"));
545        assert_eq!(sel.get("disktype").map(String::as_str), Some("ssd"));
546        assert_eq!(
547            pod.metadata
548                .annotations
549                .unwrap()
550                .get("team")
551                .map(String::as_str),
552            Some("data")
553        );
554    }
555}