use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, LocalObjectReference, Pod, PodSpec,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use parking_lot::RwLock;
use fakecloud_k8s::{labels, names, K8sClient, K8sEnv, K8sPodConfig};
use super::{BackendInitError, CacheEngineKind, CacheExec, RunningCacheContainer, RuntimeError};
const SERVICE: &str = "elasticache";
const POD_PREFIX: &str = "fakecloud-ec";
const CONTAINER: &str = "cache";
pub type PendingRdb = Arc<RwLock<HashMap<String, Vec<u8>>>>;
#[derive(Clone)]
pub(super) struct K8sCache {
client: K8sClient,
self_url: String,
internal_token: String,
pull_secret: Option<String>,
pod_config: K8sPodConfig,
pending_rdb: PendingRdb,
}
impl std::fmt::Debug for K8sCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("K8sCache")
.field("namespace", &self.client.namespace())
.field("self_url", &self.self_url)
.finish_non_exhaustive()
}
}
impl K8sCache {
pub(super) async fn from_env(
server_port: u16,
internal_token: String,
) -> Result<Self, BackendInitError> {
let env = K8sEnv::from_env(server_port)?;
let pod_config = K8sPodConfig::resolved_base("FAKECLOUD_ELASTICACHE_K8S")?;
let client = K8sClient::connect(env.namespace.clone())
.await
.map_err(|e| BackendInitError::Connect(e.to_string()))?;
tracing::info!(
namespace = %env.namespace,
self_url = %env.self_url,
"K8s ElastiCache backend initialized"
);
Ok(Self {
client,
self_url: env.self_url,
internal_token,
pull_secret: env.pull_secret,
pod_config,
pending_rdb: Arc::new(RwLock::new(HashMap::new())),
})
}
pub(super) fn pending_rdb(&self) -> PendingRdb {
self.pending_rdb.clone()
}
pub(super) async fn spawn_pod(
&self,
resource_id: &str,
engine: CacheEngineKind,
rdb_path: Option<&str>,
tags: &std::collections::BTreeMap<String, String>,
) -> Result<RunningCacheContainer, RuntimeError> {
let rdb = match rdb_path {
Some(path) => Some(tokio::fs::read(path).await.map_err(|e| {
RuntimeError::ContainerStartFailed(format!("reading snapshot rdb {path}: {e}"))
})?),
None => None,
};
self.spawn_pod_bytes(resource_id, engine, rdb, tags).await
}
async fn spawn_pod_bytes(
&self,
resource_id: &str,
engine: CacheEngineKind,
rdb: Option<Vec<u8>>,
tags: &std::collections::BTreeMap<String, String>,
) -> Result<RunningCacheContainer, RuntimeError> {
let pod_name = names::pod_name(POD_PREFIX, resource_id, resource_id);
let port = engine.port();
let rdb_url = if matches!(engine, CacheEngineKind::Redis) {
if let Some(bytes) = rdb {
self.pending_rdb.write().insert(pod_name.clone(), bytes);
Some(format!(
"{}/_fakecloud/elasticache/_internal/rdb/{}",
self.self_url.trim_end_matches('/'),
pod_name
))
} else {
None
}
} else {
None
};
let mut pod = build_cache_pod(CachePodContext {
pod_name: &pod_name,
namespace: self.client.namespace(),
instance_id: self.client.instance_id(),
resource_id,
image: engine.image(),
port,
rdb_url: rdb_url.as_deref(),
internal_token: &self.internal_token,
pull_secret: self.pull_secret.as_deref(),
});
self.pod_config
.clone()
.merge(K8sPodConfig::from_tags(tags))
.apply(&mut pod);
let result = self.launch(&pod, &pod_name, port, engine).await;
self.pending_rdb.write().remove(&pod_name);
result
}
async fn launch(
&self,
pod: &Pod,
pod_name: &str,
port: u16,
engine: CacheEngineKind,
) -> Result<RunningCacheContainer, RuntimeError> {
self.client
.create_pod(pod)
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("create cache pod: {e}")))?;
let pod_ip = match self
.client
.wait_for_pod_ip(pod_name, Duration::from_secs(90))
.await
{
Ok(ip) => ip,
Err(e) => {
self.client.delete_pod(pod_name).await;
return Err(RuntimeError::ContainerStartFailed(e.to_string()));
}
};
if let Err(e) = K8sClient::wait_for_tcp(&pod_ip, port, Duration::from_secs(30)).await {
self.client.delete_pod(pod_name).await;
return Err(RuntimeError::ContainerStartFailed(format!(
"cache pod {pod_name} ({pod_ip}:{port}) not ready: {e}"
)));
}
Ok(RunningCacheContainer {
container_id: pod_name.to_string(),
host_port: port,
endpoint_address: pod_ip,
endpoint_port: port,
engine,
})
}
pub(super) async fn delete_pod(&self, pod_name: &str) {
self.client.delete_pod(pod_name).await;
}
pub(super) async fn exec_redis(
&self,
pod_name: &str,
redis_args: &[String],
) -> Result<CacheExec, RuntimeError> {
let mut cmd: Vec<&str> = vec!["redis-cli"];
cmd.extend(redis_args.iter().map(String::as_str));
let out = self
.client
.exec(pod_name, Some(CONTAINER), &cmd)
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("exec redis-cli: {e}")))?;
Ok(CacheExec {
success: out.success(),
stdout: out.stdout,
stderr: out.stderr.into_bytes(),
})
}
pub(super) async fn dump_rdb(
&self,
pod_name: &str,
dest_path: &str,
) -> Result<(), RuntimeError> {
let save = self
.client
.exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("exec SAVE: {e}")))?;
if !save.success() {
return Err(RuntimeError::ContainerStartFailed(format!(
"redis SAVE failed: {}",
save.stderr.trim()
)));
}
let cat = self
.client
.exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("exec cat rdb: {e}")))?;
if !cat.success() {
return Err(RuntimeError::ContainerStartFailed(format!(
"reading dump.rdb from pod failed: {}",
cat.stderr.trim()
)));
}
tokio::fs::write(dest_path, &cat.stdout)
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("writing {dest_path}: {e}")))?;
Ok(())
}
pub(super) async fn reboot_pod(
&self,
resource_id: &str,
running: &RunningCacheContainer,
tags: &std::collections::BTreeMap<String, String>,
) -> Result<RunningCacheContainer, RuntimeError> {
let preserved = if matches!(running.engine, CacheEngineKind::Redis) {
self.snapshot_live_rdb(&running.container_id).await
} else {
None
};
self.client.delete_pod(&running.container_id).await;
self.spawn_pod_bytes(resource_id, running.engine, preserved, tags)
.await
}
async fn snapshot_live_rdb(&self, pod_name: &str) -> Option<Vec<u8>> {
let _ = self
.client
.exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
.await;
let cat = self
.client
.exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
.await
.ok()?;
if cat.success() && !cat.stdout.is_empty() {
Some(cat.stdout)
} else {
None
}
}
pub(super) async fn reap_stale(&self) {
self.client.reap_stale(SERVICE).await;
}
}
struct CachePodContext<'a> {
pod_name: &'a str,
namespace: &'a str,
instance_id: &'a str,
resource_id: &'a str,
image: &'a str,
port: u16,
rdb_url: Option<&'a str>,
internal_token: &'a str,
pull_secret: Option<&'a str>,
}
fn build_cache_pod(ctx: CachePodContext<'_>) -> Pod {
let mut pod_labels = std::collections::BTreeMap::new();
pod_labels.insert(
labels::MANAGED_BY.to_string(),
labels::MANAGED_BY_VALUE.to_string(),
);
pod_labels.insert(labels::INSTANCE.to_string(), ctx.instance_id.to_string());
pod_labels.insert(labels::SERVICE.to_string(), SERVICE.to_string());
pod_labels.insert(
"fakecloud-elasticache".to_string(),
names::label_safe(ctx.resource_id),
);
let (command, env) = match ctx.rdb_url {
Some(url) => {
let script = "set -e; \
wget -q --header=\"authorization: Bearer $FAKECLOUD_RDB_TOKEN\" \
-O /data/dump.rdb \"$FAKECLOUD_RDB_URL\"; \
exec redis-server"
.to_string();
(
Some(vec!["sh".to_string(), "-c".to_string(), script]),
Some(vec![
EnvVar {
name: "FAKECLOUD_RDB_URL".to_string(),
value: Some(url.to_string()),
value_from: None,
},
EnvVar {
name: "FAKECLOUD_RDB_TOKEN".to_string(),
value: Some(ctx.internal_token.to_string()),
value_from: None,
},
]),
)
}
None => (None, None),
};
let container = Container {
name: CONTAINER.to_string(),
image: Some(ctx.image.to_string()),
command,
env,
ports: Some(vec![ContainerPort {
container_port: ctx.port as i32,
..ContainerPort::default()
}]),
..Container::default()
};
let pull_secrets = ctx.pull_secret.map(|name| {
vec![LocalObjectReference {
name: name.to_string(),
}]
});
Pod {
metadata: ObjectMeta {
name: Some(ctx.pod_name.to_string()),
namespace: Some(ctx.namespace.to_string()),
labels: Some(pod_labels),
..ObjectMeta::default()
},
spec: Some(PodSpec {
restart_policy: Some("Never".to_string()),
containers: vec![container],
image_pull_secrets: pull_secrets,
..PodSpec::default()
}),
..Pod::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ctx<'a>(rdb_url: Option<&'a str>) -> CachePodContext<'a> {
CachePodContext {
pod_name: "fakecloud-ec-mycache-abc123",
namespace: "fakecloud",
instance_id: "fakecloud-1234",
resource_id: "My_Cache",
image: "redis:7-alpine",
port: 6379,
rdb_url,
internal_token: "secret-token",
pull_secret: None,
}
}
#[test]
fn pod_has_ownership_labels() {
let pod = build_cache_pod(ctx(None));
let l = pod.metadata.labels.unwrap();
assert_eq!(l.get(labels::MANAGED_BY).unwrap(), labels::MANAGED_BY_VALUE);
assert_eq!(l.get(labels::SERVICE).unwrap(), "elasticache");
assert_eq!(l.get(labels::INSTANCE).unwrap(), "fakecloud-1234");
assert_eq!(l.get("fakecloud-elasticache").unwrap(), "my-cache");
}
#[test]
fn container_exposes_engine_port_and_image() {
let pod = build_cache_pod(ctx(None));
let c = &pod.spec.unwrap().containers[0];
assert_eq!(c.image.as_deref(), Some("redis:7-alpine"));
assert_eq!(c.ports.as_ref().unwrap()[0].container_port, 6379);
}
#[test]
fn no_rdb_uses_default_entrypoint() {
let pod = build_cache_pod(ctx(None));
let c = &pod.spec.unwrap().containers[0];
assert!(c.command.is_none());
assert!(c.env.is_none());
}
#[test]
fn rdb_restore_overrides_command_and_sets_env() {
let pod = build_cache_pod(ctx(Some(
"http://fc:4566/_fakecloud/elasticache/_internal/rdb/p",
)));
let c = &pod.spec.unwrap().containers[0];
let script = c.command.as_ref().unwrap().last().unwrap();
assert!(script.contains("wget"), "should fetch rdb: {script}");
assert!(script.contains("/data/dump.rdb"));
assert!(script.contains("exec redis-server"));
assert!(script.contains("$FAKECLOUD_RDB_TOKEN"));
assert!(!script.contains("secret-token"));
let env = c.env.as_ref().unwrap();
assert!(env.iter().any(|e| e.name == "FAKECLOUD_RDB_URL"));
assert!(
env.iter()
.any(|e| e.name == "FAKECLOUD_RDB_TOKEN"
&& e.value.as_deref() == Some("secret-token"))
);
}
#[test]
fn restart_policy_never() {
let pod = build_cache_pod(ctx(None));
assert_eq!(pod.spec.unwrap().restart_policy.as_deref(), Some("Never"));
}
#[test]
fn pull_secret_attached_when_set() {
let mut c = ctx(None);
c.pull_secret = Some("reg-secret");
let pod = build_cache_pod(c);
let secrets = pod.spec.unwrap().image_pull_secrets.unwrap();
assert_eq!(secrets[0].name, "reg-secret");
}
#[test]
fn pod_config_base_applies_to_built_pod() {
use std::collections::BTreeMap;
let mut pod = build_cache_pod(ctx(None));
let cfg = K8sPodConfig {
node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
annotations: BTreeMap::from([("team".to_string(), "platform".to_string())]),
..Default::default()
};
cfg.apply(&mut pod);
let spec = pod.spec.unwrap();
assert_eq!(
spec.node_selector.unwrap().get("pool").map(String::as_str),
Some("cache")
);
assert_eq!(
pod.metadata
.annotations
.unwrap()
.get("team")
.map(String::as_str),
Some("platform")
);
}
#[test]
fn pod_config_overrides_apply_to_built_pod() {
use std::collections::BTreeMap;
let mut pod = build_cache_pod(ctx(None));
let base = K8sPodConfig {
node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
..Default::default()
};
let tags = BTreeMap::from([
(
"fakecloud-k8s/node-selector".to_string(),
"pool=spot,disktype=ssd".to_string(),
),
(
"fakecloud-k8s/annotations".to_string(),
"team=data".to_string(),
),
]);
base.merge(K8sPodConfig::from_tags(&tags)).apply(&mut pod);
let spec = pod.spec.unwrap();
let sel = spec.node_selector.unwrap();
assert_eq!(sel.get("pool").map(String::as_str), Some("spot"));
assert_eq!(sel.get("disktype").map(String::as_str), Some("ssd"));
assert_eq!(
pod.metadata
.annotations
.unwrap()
.get("team")
.map(String::as_str),
Some("data")
);
}
}