1mod k8s;
12
13use std::collections::{BTreeMap, HashMap};
14use std::sync::Arc;
15use std::time::Duration;
16
17use parking_lot::RwLock;
18
19pub use k8s::PendingRdb;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum CacheEngineKind {
24 Redis,
25 Memcached,
26}
27
28impl CacheEngineKind {
29 fn image(self) -> &'static str {
31 match self {
32 CacheEngineKind::Redis => "redis:7-alpine",
33 CacheEngineKind::Memcached => "memcached:1.6-alpine",
34 }
35 }
36
37 fn port(self) -> u16 {
39 match self {
40 CacheEngineKind::Redis => 6379,
41 CacheEngineKind::Memcached => 11211,
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct RunningCacheContainer {
49 pub container_id: String,
51 pub host_port: u16,
54 pub endpoint_address: String,
57 pub endpoint_port: u16,
60 pub engine: CacheEngineKind,
63}
64
65#[derive(Debug, Clone)]
68pub struct CacheExec {
69 pub success: bool,
71 pub stdout: Vec<u8>,
73 pub stderr: Vec<u8>,
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum RuntimeError {
79 #[error("container runtime is unavailable")]
80 Unavailable,
81 #[error("container failed to start: {0}")]
82 ContainerStartFailed(String),
83}
84
85#[derive(Debug, thiserror::Error)]
89pub enum BackendInitError {
90 #[error(transparent)]
91 Env(#[from] fakecloud_k8s::K8sEnvError),
92 #[error(transparent)]
93 PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
94 #[error("failed to connect to the Kubernetes cluster: {0}")]
95 Connect(String),
96}
97
98#[derive(Debug, Clone)]
100enum CacheBackend {
101 Docker(DockerCache),
102 K8s(k8s::K8sCache),
103}
104
105#[derive(Debug, Clone)]
106pub struct ElastiCacheRuntime {
107 backend: CacheBackend,
108 containers: Arc<RwLock<HashMap<String, RunningCacheContainer>>>,
109}
110
111impl ElastiCacheRuntime {
112 pub fn new() -> Option<Self> {
115 let cli = fakecloud_core::container_net::detect_container_cli()?;
116 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
117 Some(Self {
118 backend: CacheBackend::Docker(DockerCache {
119 cli,
120 net,
121 instance_id: format!("fakecloud-{}", std::process::id()),
122 }),
123 containers: Arc::new(RwLock::new(HashMap::new())),
124 })
125 }
126
127 pub async fn new_k8s(
133 server_port: u16,
134 internal_token: String,
135 ) -> Result<Self, BackendInitError> {
136 let cache = k8s::K8sCache::from_env(server_port, internal_token).await?;
137 Ok(Self {
138 backend: CacheBackend::K8s(cache),
139 containers: Arc::new(RwLock::new(HashMap::new())),
140 })
141 }
142
143 pub fn cli_name(&self) -> &str {
145 match &self.backend {
146 CacheBackend::Docker(d) => &d.cli,
147 CacheBackend::K8s(_) => "kubernetes",
148 }
149 }
150
151 pub fn pending_rdb(&self) -> Option<PendingRdb> {
155 match &self.backend {
156 CacheBackend::K8s(k) => Some(k.pending_rdb()),
157 CacheBackend::Docker(_) => None,
158 }
159 }
160
161 pub fn endpoint_host(&self) -> &str {
167 match &self.backend {
168 CacheBackend::Docker(d) => &d.net.sibling_host,
169 CacheBackend::K8s(_) => "127.0.0.1",
170 }
171 }
172
173 pub async fn ensure_redis(
174 &self,
175 resource_id: &str,
176 rdb_path: Option<&str>,
177 tags: &BTreeMap<String, String>,
178 ) -> Result<RunningCacheContainer, RuntimeError> {
179 let running = match &self.backend {
180 CacheBackend::Docker(d) => {
181 d.spawn_container(resource_id, CacheEngineKind::Redis, rdb_path)
182 .await?
183 }
184 CacheBackend::K8s(k) => {
185 k.spawn_pod(resource_id, CacheEngineKind::Redis, rdb_path, tags)
186 .await?
187 }
188 };
189 self.containers
190 .write()
191 .insert(resource_id.to_string(), running.clone());
192 Ok(running)
193 }
194
195 pub async fn ensure_memcached(
196 &self,
197 resource_id: &str,
198 tags: &BTreeMap<String, String>,
199 ) -> Result<RunningCacheContainer, RuntimeError> {
200 let running = match &self.backend {
201 CacheBackend::Docker(d) => {
202 d.spawn_container(resource_id, CacheEngineKind::Memcached, None)
203 .await?
204 }
205 CacheBackend::K8s(k) => {
206 k.spawn_pod(resource_id, CacheEngineKind::Memcached, None, tags)
207 .await?
208 }
209 };
210 self.containers
211 .write()
212 .insert(resource_id.to_string(), running.clone());
213 Ok(running)
214 }
215
216 pub async fn stop_container(&self, resource_id: &str) {
217 let container = self.containers.write().remove(resource_id);
218 if let Some(container) = container {
219 match &self.backend {
220 CacheBackend::Docker(d) => d.remove_container(&container.container_id).await,
221 CacheBackend::K8s(k) => k.delete_pod(&container.container_id).await,
222 }
223 }
224 }
225
226 pub async fn remove_data_volume(&self, resource_id: &str) {
231 if let CacheBackend::Docker(d) = &self.backend {
232 d.remove_data_volume(resource_id).await;
233 }
234 }
235
236 pub async fn restart_container(
240 &self,
241 resource_id: &str,
242 tags: &BTreeMap<String, String>,
243 ) -> Result<(), RuntimeError> {
244 let running = {
245 let containers = self.containers.read();
246 containers.get(resource_id).cloned()
247 };
248 let running = running.ok_or(RuntimeError::Unavailable)?;
249 match &self.backend {
250 CacheBackend::Docker(d) => d.restart_container(&running.container_id).await,
251 CacheBackend::K8s(k) => {
252 let updated = k.reboot_pod(resource_id, &running, tags).await?;
258 self.containers
259 .write()
260 .insert(resource_id.to_string(), updated);
261 Ok(())
262 }
263 }
264 }
265
266 pub async fn exec_redis(
268 &self,
269 resource_id: &str,
270 redis_args: &[String],
271 ) -> Result<CacheExec, RuntimeError> {
272 let container_id = {
273 let containers = self.containers.read();
274 containers
275 .get(resource_id)
276 .map(|c| c.container_id.clone())
277 .ok_or(RuntimeError::Unavailable)?
278 };
279 match &self.backend {
280 CacheBackend::Docker(d) => d.exec_redis(&container_id, redis_args).await,
281 CacheBackend::K8s(k) => k.exec_redis(&container_id, redis_args).await,
282 }
283 }
284
285 pub async fn dump_rdb(&self, resource_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
288 let container_id = {
289 let containers = self.containers.read();
290 containers
291 .get(resource_id)
292 .map(|c| c.container_id.clone())
293 .ok_or(RuntimeError::Unavailable)?
294 };
295 match &self.backend {
296 CacheBackend::Docker(d) => d.dump_rdb(&container_id, dest_path).await,
297 CacheBackend::K8s(k) => k.dump_rdb(&container_id, dest_path).await,
298 }
299 }
300
301 pub async fn stop_all(&self) {
302 let containers: Vec<RunningCacheContainer> = {
303 let mut containers = self.containers.write();
304 containers.drain().map(|(_, c)| c).collect()
305 };
306 for c in containers {
307 match &self.backend {
308 CacheBackend::Docker(d) => d.remove_container(&c.container_id).await,
309 CacheBackend::K8s(k) => k.delete_pod(&c.container_id).await,
310 }
311 }
312 }
313
314 pub async fn reap_stale(&self) {
317 if let CacheBackend::K8s(k) = &self.backend {
318 k.reap_stale().await;
319 }
320 }
321}
322
323#[derive(Debug, Clone)]
326struct DockerCache {
327 cli: String,
328 net: fakecloud_core::container_net::HostNetworking,
329 instance_id: String,
330}
331
332impl DockerCache {
333 async fn spawn_container(
334 &self,
335 resource_id: &str,
336 engine: CacheEngineKind,
337 rdb_path: Option<&str>,
338 ) -> Result<RunningCacheContainer, RuntimeError> {
339 let image = engine.image();
340 let container_port = engine.port();
341
342 let mut args: Vec<String> = vec![
343 "create".to_string(),
344 "-p".to_string(),
345 format!(":{container_port}"),
346 "--label".to_string(),
347 format!("fakecloud-elasticache={resource_id}"),
348 "--label".to_string(),
349 format!("fakecloud-instance={}", self.instance_id),
350 ];
351
352 if matches!(engine, CacheEngineKind::Redis) {
360 args.push("-v".to_string());
361 args.push(format!("{}:/data", data_volume_name(resource_id)));
362 }
363 args.push(image.to_string());
364
365 let output = tokio::process::Command::new(&self.cli)
366 .args(&args)
367 .output()
368 .await
369 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
370
371 if !output.status.success() {
372 return Err(RuntimeError::ContainerStartFailed(
373 String::from_utf8_lossy(&output.stderr).trim().to_string(),
374 ));
375 }
376
377 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
378
379 if let Some(path) = rdb_path {
389 let cp_result = tokio::process::Command::new(&self.cli)
390 .args(["cp", path, &format!("{container_id}:/data/dump.rdb")])
391 .output()
392 .await
393 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
394 if !cp_result.status.success() {
395 self.remove_container(&container_id).await;
396 return Err(RuntimeError::ContainerStartFailed(format!(
397 "failed to stage snapshot rdb into container: {}",
398 String::from_utf8_lossy(&cp_result.stderr).trim()
399 )));
400 }
401 }
402
403 let start_result = tokio::process::Command::new(&self.cli)
404 .args(["start", &container_id])
405 .output()
406 .await
407 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
408
409 if !start_result.status.success() {
410 self.remove_container(&container_id).await;
411 return Err(RuntimeError::ContainerStartFailed(format!(
412 "container start failed: {}",
413 String::from_utf8_lossy(&start_result.stderr).trim()
414 )));
415 }
416
417 let host_port = match self.lookup_port(&container_id, container_port).await {
418 Ok(host_port) => host_port,
419 Err(error) => {
420 self.remove_container(&container_id).await;
421 return Err(error);
422 }
423 };
424
425 let wait_result = match engine {
426 CacheEngineKind::Redis => self.wait_for_redis(host_port).await,
427 CacheEngineKind::Memcached => self.wait_for_memcached(host_port).await,
428 };
429 if let Err(error) = wait_result {
430 self.remove_container(&container_id).await;
431 return Err(error);
432 }
433
434 Ok(RunningCacheContainer {
435 container_id,
436 host_port,
437 endpoint_address: self.net.sibling_host.clone(),
442 endpoint_port: host_port,
443 engine,
444 })
445 }
446
447 async fn restart_container(&self, container_id: &str) -> Result<(), RuntimeError> {
448 let output = tokio::process::Command::new(&self.cli)
449 .args(["restart", container_id])
450 .output()
451 .await
452 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
453 if !output.status.success() {
454 return Err(RuntimeError::ContainerStartFailed(
455 String::from_utf8_lossy(&output.stderr).trim().to_string(),
456 ));
457 }
458 Ok(())
459 }
460
461 async fn exec_redis(
462 &self,
463 container_id: &str,
464 redis_args: &[String],
465 ) -> Result<CacheExec, RuntimeError> {
466 let mut args = vec![
467 "exec".to_string(),
468 container_id.to_string(),
469 "redis-cli".to_string(),
470 ];
471 args.extend_from_slice(redis_args);
472 let out = tokio::process::Command::new(&self.cli)
473 .args(&args)
474 .output()
475 .await
476 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
477 Ok(CacheExec {
478 success: out.status.success(),
479 stdout: out.stdout,
480 stderr: out.stderr,
481 })
482 }
483
484 async fn dump_rdb(&self, container_id: &str, dest_path: &str) -> Result<(), RuntimeError> {
485 let save_output = tokio::process::Command::new(&self.cli)
486 .args(["exec", container_id, "redis-cli", "SAVE"])
487 .output()
488 .await
489 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
490 if !save_output.status.success() {
491 return Err(RuntimeError::ContainerStartFailed(
492 String::from_utf8_lossy(&save_output.stderr)
493 .trim()
494 .to_string(),
495 ));
496 }
497
498 let cp_output = tokio::process::Command::new(&self.cli)
499 .args(["cp", &format!("{container_id}:/data/dump.rdb"), dest_path])
500 .output()
501 .await
502 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
503 if !cp_output.status.success() {
504 return Err(RuntimeError::ContainerStartFailed(
505 String::from_utf8_lossy(&cp_output.stderr)
506 .trim()
507 .to_string(),
508 ));
509 }
510 Ok(())
511 }
512
513 async fn lookup_port(
514 &self,
515 container_id: &str,
516 container_port: u16,
517 ) -> Result<u16, RuntimeError> {
518 let port_output = tokio::process::Command::new(&self.cli)
519 .args(["port", container_id, &container_port.to_string()])
520 .output()
521 .await
522 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
523
524 if !port_output.status.success() {
525 let stderr = String::from_utf8_lossy(&port_output.stderr);
526 return Err(RuntimeError::ContainerStartFailed(format!(
527 "port lookup failed: {stderr}"
528 )));
529 }
530
531 let port_str = String::from_utf8_lossy(&port_output.stdout);
532 port_str
533 .trim()
534 .rsplit(':')
535 .next()
536 .and_then(|value| value.parse::<u16>().ok())
537 .ok_or_else(|| {
538 RuntimeError::ContainerStartFailed(format!(
539 "could not determine redis port from '{}'",
540 port_str.trim()
541 ))
542 })
543 }
544
545 async fn wait_for_redis(&self, host_port: u16) -> Result<(), RuntimeError> {
546 let host = &self.net.sibling_host;
550 for _ in 0..40 {
551 tokio::time::sleep(Duration::from_millis(500)).await;
552 if tokio::net::TcpStream::connect(format!("{host}:{host_port}"))
553 .await
554 .is_ok()
555 {
556 return Ok(());
557 }
558 }
559
560 Err(RuntimeError::ContainerStartFailed(
561 "redis container did not become ready within 20 seconds".to_string(),
562 ))
563 }
564
565 async fn wait_for_memcached(&self, host_port: u16) -> Result<(), RuntimeError> {
566 use tokio::io::{AsyncReadExt, AsyncWriteExt};
567 let host = &self.net.sibling_host;
568 for _ in 0..40 {
569 tokio::time::sleep(Duration::from_millis(500)).await;
570 let Ok(mut stream) =
571 tokio::net::TcpStream::connect(format!("{host}:{host_port}")).await
572 else {
573 continue;
574 };
575 if stream.write_all(b"version\r\n").await.is_err() {
576 continue;
577 }
578 let mut buf = [0u8; 32];
579 match tokio::time::timeout(Duration::from_secs(2), stream.read(&mut buf)).await {
580 Ok(Ok(n)) if n > 0 && buf.starts_with(b"VERSION") => return Ok(()),
581 _ => continue,
582 }
583 }
584
585 Err(RuntimeError::ContainerStartFailed(
586 "memcached container did not become ready within 20 seconds".to_string(),
587 ))
588 }
589
590 async fn remove_container(&self, container_id: &str) {
591 let _ = tokio::process::Command::new(&self.cli)
592 .args(["rm", "-f", container_id])
593 .output()
594 .await;
595 }
596
597 async fn remove_data_volume(&self, resource_id: &str) {
598 let _ = tokio::process::Command::new(&self.cli)
599 .args(["volume", "rm", "-f", &data_volume_name(resource_id)])
600 .output()
601 .await;
602 }
603}
604
605fn data_volume_name(resource_id: &str) -> String {
610 let sanitized: String = resource_id
611 .chars()
612 .map(|c| {
613 if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
614 c
615 } else {
616 '-'
617 }
618 })
619 .collect();
620 format!("fakecloud-elasticache-data-{sanitized}")
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 #[test]
628 fn data_volume_name_is_stable_and_sanitized() {
629 assert_eq!(
632 data_volume_name("my-cache"),
633 "fakecloud-elasticache-data-my-cache"
634 );
635 assert_eq!(
636 data_volume_name("weird/id:1"),
637 "fakecloud-elasticache-data-weird-id-1"
638 );
639 }
640}