mod k8s;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
pub use k8s::PendingRdb;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheEngineKind {
Redis,
Memcached,
}
impl CacheEngineKind {
fn image(self) -> &'static str {
match self {
CacheEngineKind::Redis => "redis:7-alpine",
CacheEngineKind::Memcached => "memcached:1.6-alpine",
}
}
fn port(self) -> u16 {
match self {
CacheEngineKind::Redis => 6379,
CacheEngineKind::Memcached => 11211,
}
}
}
#[derive(Debug, Clone)]
pub struct RunningCacheContainer {
pub container_id: String,
pub host_port: u16,
pub endpoint_address: String,
pub endpoint_port: u16,
pub engine: CacheEngineKind,
}
#[derive(Debug, Clone)]
pub struct CacheExec {
pub success: bool,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
}
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("container runtime is unavailable")]
Unavailable,
#[error("container failed to start: {0}")]
ContainerStartFailed(String),
}
#[derive(Debug, thiserror::Error)]
pub enum BackendInitError {
#[error(transparent)]
Env(#[from] fakecloud_k8s::K8sEnvError),
#[error(transparent)]
PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
#[error("failed to connect to the Kubernetes cluster: {0}")]
Connect(String),
}
#[derive(Debug, Clone)]
enum CacheBackend {
Docker(DockerCache),
K8s(k8s::K8sCache),
}
#[derive(Debug, Clone)]
pub struct ElastiCacheRuntime {
backend: CacheBackend,
containers: Arc<RwLock<HashMap<String, RunningCacheContainer>>>,
}
impl ElastiCacheRuntime {
pub fn new() -> Option<Self> {
let cli = fakecloud_core::container_net::detect_container_cli()?;
let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
Some(Self {
backend: CacheBackend::Docker(DockerCache {
cli,
net,
instance_id: format!("fakecloud-{}", std::process::id()),
}),
containers: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn new_k8s(
server_port: u16,
internal_token: String,
) -> Result<Self, BackendInitError> {
let cache = k8s::K8sCache::from_env(server_port, internal_token).await?;
Ok(Self {
backend: CacheBackend::K8s(cache),
containers: Arc::new(RwLock::new(HashMap::new())),
})
}
pub fn cli_name(&self) -> &str {
match &self.backend {
CacheBackend::Docker(d) => &d.cli,
CacheBackend::K8s(_) => "kubernetes",
}
}
pub fn pending_rdb(&self) -> Option<PendingRdb> {
match &self.backend {
CacheBackend::K8s(k) => Some(k.pending_rdb()),
CacheBackend::Docker(_) => None,
}
}
pub fn endpoint_host(&self) -> &str {
match &self.backend {
CacheBackend::Docker(d) => &d.net.sibling_host,
CacheBackend::K8s(_) => "127.0.0.1",
}
}
pub async fn ensure_redis(
&self,
resource_id: &str,
rdb_path: Option<&str>,
tags: &BTreeMap<String, String>,
) -> Result<RunningCacheContainer, RuntimeError> {
let running = match &self.backend {
CacheBackend::Docker(d) => {
d.spawn_container(resource_id, CacheEngineKind::Redis, rdb_path)
.await?
}
CacheBackend::K8s(k) => {
k.spawn_pod(resource_id, CacheEngineKind::Redis, rdb_path, tags)
.await?
}
};
self.containers
.write()
.insert(resource_id.to_string(), running.clone());
Ok(running)
}
pub async fn ensure_memcached(
&self,
resource_id: &str,
tags: &BTreeMap<String, String>,
) -> Result<RunningCacheContainer, RuntimeError> {
let running = match &self.backend {
CacheBackend::Docker(d) => {
d.spawn_container(resource_id, CacheEngineKind::Memcached, None)
.await?
}
CacheBackend::K8s(k) => {
k.spawn_pod(resource_id, CacheEngineKind::Memcached, None, tags)
.await?
}
};
self.containers
.write()
.insert(resource_id.to_string(), running.clone());
Ok(running)
}
pub async fn stop_container(&self, resource_id: &str) {
let container = self.containers.write().remove(resource_id);
if let Some(container) = container {
match &self.backend {
CacheBackend::Docker(d) => d.remove_container(&container.container_id).await,
CacheBackend::K8s(k) => k.delete_pod(&container.container_id).await,
}
}
}
pub async fn remove_data_volume(&self, resource_id: &str) {
if let CacheBackend::Docker(d) = &self.backend {
d.remove_data_volume(resource_id).await;
}
}
pub async fn restart_container(
&self,
resource_id: &str,
tags: &BTreeMap<String, String>,
) -> Result<(), RuntimeError> {
let running = {
let containers = self.containers.read();
containers.get(resource_id).cloned()
};
let running = running.ok_or(RuntimeError::Unavailable)?;
match &self.backend {
CacheBackend::Docker(d) => d.restart_container(&running.container_id).await,
CacheBackend::K8s(k) => {
let updated = k.reboot_pod(resource_id, &running, tags).await?;
self.containers
.write()
.insert(resource_id.to_string(), updated);
Ok(())
}
}
}
pub async fn exec_redis(
&self,
resource_id: &str,
redis_args: &[String],
) -> Result<CacheExec, RuntimeError> {
let container_id = {
let containers = self.containers.read();
containers
.get(resource_id)
.map(|c| c.container_id.clone())
.ok_or(RuntimeError::Unavailable)?
};
match &self.backend {
CacheBackend::Docker(d) => d.exec_redis(&container_id, redis_args).await,
CacheBackend::K8s(k) => k.exec_redis(&container_id, redis_args).await,
}
}
pub async fn dump_rdb(&self, resource_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
let container_id = {
let containers = self.containers.read();
containers
.get(resource_id)
.map(|c| c.container_id.clone())
.ok_or(RuntimeError::Unavailable)?
};
match &self.backend {
CacheBackend::Docker(d) => d.dump_rdb(&container_id, dest_path).await,
CacheBackend::K8s(k) => k.dump_rdb(&container_id, dest_path).await,
}
}
pub async fn stop_all(&self) {
let containers: Vec<RunningCacheContainer> = {
let mut containers = self.containers.write();
containers.drain().map(|(_, c)| c).collect()
};
for c in containers {
match &self.backend {
CacheBackend::Docker(d) => d.remove_container(&c.container_id).await,
CacheBackend::K8s(k) => k.delete_pod(&c.container_id).await,
}
}
}
pub async fn reap_stale(&self) {
if let CacheBackend::K8s(k) = &self.backend {
k.reap_stale().await;
}
}
}
#[derive(Debug, Clone)]
struct DockerCache {
cli: String,
net: fakecloud_core::container_net::HostNetworking,
instance_id: String,
}
impl DockerCache {
async fn spawn_container(
&self,
resource_id: &str,
engine: CacheEngineKind,
rdb_path: Option<&str>,
) -> Result<RunningCacheContainer, RuntimeError> {
let image = engine.image();
let container_port = engine.port();
let mut args: Vec<String> = vec![
"create".to_string(),
"-p".to_string(),
format!(":{container_port}"),
"--label".to_string(),
format!("fakecloud-elasticache={resource_id}"),
"--label".to_string(),
format!("fakecloud-instance={}", self.instance_id),
];
if matches!(engine, CacheEngineKind::Redis) {
args.push("-v".to_string());
args.push(format!("{}:/data", data_volume_name(resource_id)));
}
args.push(image.to_string());
let output = tokio::process::Command::new(&self.cli)
.args(&args)
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&output.stderr).trim().to_string(),
));
}
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
if let Some(path) = rdb_path {
let cp_result = tokio::process::Command::new(&self.cli)
.args(["cp", path, &format!("{container_id}:/data/dump.rdb")])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !cp_result.status.success() {
self.remove_container(&container_id).await;
return Err(RuntimeError::ContainerStartFailed(format!(
"failed to stage snapshot rdb into container: {}",
String::from_utf8_lossy(&cp_result.stderr).trim()
)));
}
}
let start_result = tokio::process::Command::new(&self.cli)
.args(["start", &container_id])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !start_result.status.success() {
self.remove_container(&container_id).await;
return Err(RuntimeError::ContainerStartFailed(format!(
"container start failed: {}",
String::from_utf8_lossy(&start_result.stderr).trim()
)));
}
let host_port = match self.lookup_port(&container_id, container_port).await {
Ok(host_port) => host_port,
Err(error) => {
self.remove_container(&container_id).await;
return Err(error);
}
};
let wait_result = match engine {
CacheEngineKind::Redis => self.wait_for_redis(host_port).await,
CacheEngineKind::Memcached => self.wait_for_memcached(host_port).await,
};
if let Err(error) = wait_result {
self.remove_container(&container_id).await;
return Err(error);
}
Ok(RunningCacheContainer {
container_id,
host_port,
endpoint_address: self.net.sibling_host.clone(),
endpoint_port: host_port,
engine,
})
}
async fn restart_container(&self, container_id: &str) -> Result<(), RuntimeError> {
let output = tokio::process::Command::new(&self.cli)
.args(["restart", container_id])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&output.stderr).trim().to_string(),
));
}
Ok(())
}
async fn exec_redis(
&self,
container_id: &str,
redis_args: &[String],
) -> Result<CacheExec, RuntimeError> {
let mut args = vec![
"exec".to_string(),
container_id.to_string(),
"redis-cli".to_string(),
];
args.extend_from_slice(redis_args);
let out = tokio::process::Command::new(&self.cli)
.args(&args)
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
Ok(CacheExec {
success: out.status.success(),
stdout: out.stdout,
stderr: out.stderr,
})
}
async fn dump_rdb(&self, container_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
let save_output = tokio::process::Command::new(&self.cli)
.args(["exec", container_id, "redis-cli", "SAVE"])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !save_output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&save_output.stderr)
.trim()
.to_string(),
));
}
let cp_output = tokio::process::Command::new(&self.cli)
.args(["cp", &format!("{container_id}:/data/dump.rdb"), dest_path])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !cp_output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&cp_output.stderr)
.trim()
.to_string(),
));
}
Ok(())
}
async fn lookup_port(
&self,
container_id: &str,
container_port: u16,
) -> Result<u16, RuntimeError> {
let port_output = tokio::process::Command::new(&self.cli)
.args(["port", container_id, &container_port.to_string()])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !port_output.status.success() {
let stderr = String::from_utf8_lossy(&port_output.stderr);
return Err(RuntimeError::ContainerStartFailed(format!(
"port lookup failed: {stderr}"
)));
}
let port_str = String::from_utf8_lossy(&port_output.stdout);
port_str
.trim()
.rsplit(':')
.next()
.and_then(|value| value.parse::<u16>().ok())
.ok_or_else(|| {
RuntimeError::ContainerStartFailed(format!(
"could not determine redis port from '{}'",
port_str.trim()
))
})
}
async fn wait_for_redis(&self, host_port: u16) -> Result<(), RuntimeError> {
let host = &self.net.sibling_host;
for _ in 0..40 {
tokio::time::sleep(Duration::from_millis(500)).await;
if tokio::net::TcpStream::connect(format!("{host}:{host_port}"))
.await
.is_ok()
{
return Ok(());
}
}
Err(RuntimeError::ContainerStartFailed(
"redis container did not become ready within 20 seconds".to_string(),
))
}
async fn wait_for_memcached(&self, host_port: u16) -> Result<(), RuntimeError> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let host = &self.net.sibling_host;
for _ in 0..40 {
tokio::time::sleep(Duration::from_millis(500)).await;
let Ok(mut stream) =
tokio::net::TcpStream::connect(format!("{host}:{host_port}")).await
else {
continue;
};
if stream.write_all(b"version\r\n").await.is_err() {
continue;
}
let mut buf = [0u8; 32];
match tokio::time::timeout(Duration::from_secs(2), stream.read(&mut buf)).await {
Ok(Ok(n)) if n > 0 && buf.starts_with(b"VERSION") => return Ok(()),
_ => continue,
}
}
Err(RuntimeError::ContainerStartFailed(
"memcached container did not become ready within 20 seconds".to_string(),
))
}
async fn remove_container(&self, container_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["rm", "-f", container_id])
.output()
.await;
}
async fn remove_data_volume(&self, resource_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["volume", "rm", "-f", &data_volume_name(resource_id)])
.output()
.await;
}
}
fn data_volume_name(resource_id: &str) -> String {
let sanitized: String = resource_id
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
c
} else {
'-'
}
})
.collect();
format!("fakecloud-elasticache-data-{sanitized}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn data_volume_name_is_stable_and_sanitized() {
assert_eq!(
data_volume_name("my-cache"),
"fakecloud-elasticache-data-my-cache"
);
assert_eq!(
data_volume_name("weird/id:1"),
"fakecloud-elasticache-data-weird-id-1"
);
}
}