1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use k8s_openapi::api::core::v1::{
20 Container, ContainerPort, EnvVar, LocalObjectReference, Pod, PodSpec,
21};
22use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
23use parking_lot::RwLock;
24
25use fakecloud_k8s::{labels, names, K8sClient, K8sEnv, K8sPodConfig};
26
27use super::{BackendInitError, CacheEngineKind, CacheExec, RunningCacheContainer, RuntimeError};
28
29const SERVICE: &str = "elasticache";
31const POD_PREFIX: &str = "fakecloud-ec";
34const CONTAINER: &str = "cache";
36
37pub type PendingRdb = Arc<RwLock<HashMap<String, Vec<u8>>>>;
42
43#[derive(Clone)]
44pub(super) struct K8sCache {
45 client: K8sClient,
46 self_url: String,
49 internal_token: String,
51 pull_secret: Option<String>,
53 pod_config: K8sPodConfig,
56 pending_rdb: PendingRdb,
57}
58
59impl std::fmt::Debug for K8sCache {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("K8sCache")
62 .field("namespace", &self.client.namespace())
63 .field("self_url", &self.self_url)
64 .finish_non_exhaustive()
65 }
66}
67
68impl K8sCache {
69 pub(super) async fn from_env(
70 server_port: u16,
71 internal_token: String,
72 ) -> Result<Self, BackendInitError> {
73 let env = K8sEnv::from_env(server_port)?;
74 let pod_config = K8sPodConfig::resolved_base("FAKECLOUD_ELASTICACHE_K8S")?;
75 let client = K8sClient::connect(env.namespace.clone())
76 .await
77 .map_err(|e| BackendInitError::Connect(e.to_string()))?;
78 tracing::info!(
79 namespace = %env.namespace,
80 self_url = %env.self_url,
81 "K8s ElastiCache backend initialized"
82 );
83 Ok(Self {
84 client,
85 self_url: env.self_url,
86 internal_token,
87 pull_secret: env.pull_secret,
88 pod_config,
89 pending_rdb: Arc::new(RwLock::new(HashMap::new())),
90 })
91 }
92
93 pub(super) fn pending_rdb(&self) -> PendingRdb {
94 self.pending_rdb.clone()
95 }
96
97 pub(super) async fn spawn_pod(
100 &self,
101 resource_id: &str,
102 engine: CacheEngineKind,
103 rdb_path: Option<&str>,
104 tags: &std::collections::BTreeMap<String, String>,
105 ) -> Result<RunningCacheContainer, RuntimeError> {
106 let rdb = match rdb_path {
107 Some(path) => Some(tokio::fs::read(path).await.map_err(|e| {
108 RuntimeError::ContainerStartFailed(format!("reading snapshot rdb {path}: {e}"))
109 })?),
110 None => None,
111 };
112 self.spawn_pod_bytes(resource_id, engine, rdb, tags).await
113 }
114
115 async fn spawn_pod_bytes(
116 &self,
117 resource_id: &str,
118 engine: CacheEngineKind,
119 rdb: Option<Vec<u8>>,
120 tags: &std::collections::BTreeMap<String, String>,
121 ) -> Result<RunningCacheContainer, RuntimeError> {
122 let pod_name = names::pod_name(POD_PREFIX, resource_id, resource_id);
123 let port = engine.port();
124
125 let rdb_url = if matches!(engine, CacheEngineKind::Redis) {
128 if let Some(bytes) = rdb {
129 self.pending_rdb.write().insert(pod_name.clone(), bytes);
130 Some(format!(
131 "{}/_fakecloud/elasticache/_internal/rdb/{}",
132 self.self_url.trim_end_matches('/'),
133 pod_name
134 ))
135 } else {
136 None
137 }
138 } else {
139 None
140 };
141
142 let mut pod = build_cache_pod(CachePodContext {
143 pod_name: &pod_name,
144 namespace: self.client.namespace(),
145 instance_id: self.client.instance_id(),
146 resource_id,
147 image: engine.image(),
148 port,
149 rdb_url: rdb_url.as_deref(),
150 internal_token: &self.internal_token,
151 pull_secret: self.pull_secret.as_deref(),
152 });
153 self.pod_config
159 .clone()
160 .merge(K8sPodConfig::from_tags(tags))
161 .apply(&mut pod);
162
163 let result = self.launch(&pod, &pod_name, port, engine).await;
164 self.pending_rdb.write().remove(&pod_name);
167 result
168 }
169
170 async fn launch(
171 &self,
172 pod: &Pod,
173 pod_name: &str,
174 port: u16,
175 engine: CacheEngineKind,
176 ) -> Result<RunningCacheContainer, RuntimeError> {
177 self.client
178 .create_pod(pod)
179 .await
180 .map_err(|e| RuntimeError::ContainerStartFailed(format!("create cache pod: {e}")))?;
181
182 let pod_ip = match self
183 .client
184 .wait_for_pod_ip(pod_name, Duration::from_secs(90))
185 .await
186 {
187 Ok(ip) => ip,
188 Err(e) => {
189 self.client.delete_pod(pod_name).await;
190 return Err(RuntimeError::ContainerStartFailed(e.to_string()));
191 }
192 };
193 if let Err(e) = K8sClient::wait_for_tcp(&pod_ip, port, Duration::from_secs(30)).await {
194 self.client.delete_pod(pod_name).await;
195 return Err(RuntimeError::ContainerStartFailed(format!(
196 "cache pod {pod_name} ({pod_ip}:{port}) not ready: {e}"
197 )));
198 }
199
200 Ok(RunningCacheContainer {
201 container_id: pod_name.to_string(),
202 host_port: port,
203 endpoint_address: pod_ip,
204 endpoint_port: port,
205 engine,
206 })
207 }
208
209 pub(super) async fn delete_pod(&self, pod_name: &str) {
210 self.client.delete_pod(pod_name).await;
211 }
212
213 pub(super) async fn exec_redis(
214 &self,
215 pod_name: &str,
216 redis_args: &[String],
217 ) -> Result<CacheExec, RuntimeError> {
218 let mut cmd: Vec<&str> = vec!["redis-cli"];
219 cmd.extend(redis_args.iter().map(String::as_str));
220 let out = self
221 .client
222 .exec(pod_name, Some(CONTAINER), &cmd)
223 .await
224 .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec redis-cli: {e}")))?;
225 Ok(CacheExec {
226 success: out.success(),
227 stdout: out.stdout,
228 stderr: out.stderr.into_bytes(),
229 })
230 }
231
232 pub(super) async fn dump_rdb(
233 &self,
234 pod_name: &str,
235 dest_path: &str,
236 ) -> Result<(), RuntimeError> {
237 let save = self
238 .client
239 .exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
240 .await
241 .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec SAVE: {e}")))?;
242 if !save.success() {
243 return Err(RuntimeError::ContainerStartFailed(format!(
244 "redis SAVE failed: {}",
245 save.stderr.trim()
246 )));
247 }
248 let cat = self
249 .client
250 .exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
251 .await
252 .map_err(|e| RuntimeError::ContainerStartFailed(format!("exec cat rdb: {e}")))?;
253 if !cat.success() {
254 return Err(RuntimeError::ContainerStartFailed(format!(
255 "reading dump.rdb from pod failed: {}",
256 cat.stderr.trim()
257 )));
258 }
259 tokio::fs::write(dest_path, &cat.stdout)
260 .await
261 .map_err(|e| RuntimeError::ContainerStartFailed(format!("writing {dest_path}: {e}")))?;
262 Ok(())
263 }
264
265 pub(super) async fn reboot_pod(
272 &self,
273 resource_id: &str,
274 running: &RunningCacheContainer,
275 tags: &std::collections::BTreeMap<String, String>,
276 ) -> Result<RunningCacheContainer, RuntimeError> {
277 let preserved = if matches!(running.engine, CacheEngineKind::Redis) {
278 self.snapshot_live_rdb(&running.container_id).await
279 } else {
280 None
281 };
282 self.client.delete_pod(&running.container_id).await;
283 self.spawn_pod_bytes(resource_id, running.engine, preserved, tags)
284 .await
285 }
286
287 async fn snapshot_live_rdb(&self, pod_name: &str) -> Option<Vec<u8>> {
291 let _ = self
292 .client
293 .exec(pod_name, Some(CONTAINER), &["redis-cli", "SAVE"])
294 .await;
295 let cat = self
296 .client
297 .exec(pod_name, Some(CONTAINER), &["cat", "/data/dump.rdb"])
298 .await
299 .ok()?;
300 if cat.success() && !cat.stdout.is_empty() {
301 Some(cat.stdout)
302 } else {
303 None
304 }
305 }
306
307 pub(super) async fn reap_stale(&self) {
308 self.client.reap_stale(SERVICE).await;
309 }
310}
311
312struct CachePodContext<'a> {
314 pod_name: &'a str,
315 namespace: &'a str,
316 instance_id: &'a str,
317 resource_id: &'a str,
318 image: &'a str,
319 port: u16,
320 rdb_url: Option<&'a str>,
323 internal_token: &'a str,
324 pull_secret: Option<&'a str>,
325}
326
327fn build_cache_pod(ctx: CachePodContext<'_>) -> Pod {
330 let mut pod_labels = std::collections::BTreeMap::new();
331 pod_labels.insert(
332 labels::MANAGED_BY.to_string(),
333 labels::MANAGED_BY_VALUE.to_string(),
334 );
335 pod_labels.insert(labels::INSTANCE.to_string(), ctx.instance_id.to_string());
336 pod_labels.insert(labels::SERVICE.to_string(), SERVICE.to_string());
337 pod_labels.insert(
338 "fakecloud-elasticache".to_string(),
339 names::label_safe(ctx.resource_id),
340 );
341
342 let (command, env) = match ctx.rdb_url {
346 Some(url) => {
347 let script = "set -e; \
348 wget -q --header=\"authorization: Bearer $FAKECLOUD_RDB_TOKEN\" \
349 -O /data/dump.rdb \"$FAKECLOUD_RDB_URL\"; \
350 exec redis-server"
351 .to_string();
352 (
353 Some(vec!["sh".to_string(), "-c".to_string(), script]),
354 Some(vec![
355 EnvVar {
356 name: "FAKECLOUD_RDB_URL".to_string(),
357 value: Some(url.to_string()),
358 value_from: None,
359 },
360 EnvVar {
361 name: "FAKECLOUD_RDB_TOKEN".to_string(),
362 value: Some(ctx.internal_token.to_string()),
363 value_from: None,
364 },
365 ]),
366 )
367 }
368 None => (None, None),
369 };
370
371 let container = Container {
372 name: CONTAINER.to_string(),
373 image: Some(ctx.image.to_string()),
374 command,
375 env,
376 ports: Some(vec![ContainerPort {
377 container_port: ctx.port as i32,
378 ..ContainerPort::default()
379 }]),
380 ..Container::default()
381 };
382
383 let pull_secrets = ctx.pull_secret.map(|name| {
384 vec![LocalObjectReference {
385 name: name.to_string(),
386 }]
387 });
388
389 Pod {
390 metadata: ObjectMeta {
391 name: Some(ctx.pod_name.to_string()),
392 namespace: Some(ctx.namespace.to_string()),
393 labels: Some(pod_labels),
394 ..ObjectMeta::default()
395 },
396 spec: Some(PodSpec {
397 restart_policy: Some("Never".to_string()),
400 containers: vec![container],
401 image_pull_secrets: pull_secrets,
402 ..PodSpec::default()
403 }),
404 ..Pod::default()
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 fn ctx<'a>(rdb_url: Option<&'a str>) -> CachePodContext<'a> {
413 CachePodContext {
414 pod_name: "fakecloud-ec-mycache-abc123",
415 namespace: "fakecloud",
416 instance_id: "fakecloud-1234",
417 resource_id: "My_Cache",
418 image: "redis:7-alpine",
419 port: 6379,
420 rdb_url,
421 internal_token: "secret-token",
422 pull_secret: None,
423 }
424 }
425
426 #[test]
427 fn pod_has_ownership_labels() {
428 let pod = build_cache_pod(ctx(None));
429 let l = pod.metadata.labels.unwrap();
430 assert_eq!(l.get(labels::MANAGED_BY).unwrap(), labels::MANAGED_BY_VALUE);
431 assert_eq!(l.get(labels::SERVICE).unwrap(), "elasticache");
432 assert_eq!(l.get(labels::INSTANCE).unwrap(), "fakecloud-1234");
433 assert_eq!(l.get("fakecloud-elasticache").unwrap(), "my-cache");
435 }
436
437 #[test]
438 fn container_exposes_engine_port_and_image() {
439 let pod = build_cache_pod(ctx(None));
440 let c = &pod.spec.unwrap().containers[0];
441 assert_eq!(c.image.as_deref(), Some("redis:7-alpine"));
442 assert_eq!(c.ports.as_ref().unwrap()[0].container_port, 6379);
443 }
444
445 #[test]
446 fn no_rdb_uses_default_entrypoint() {
447 let pod = build_cache_pod(ctx(None));
448 let c = &pod.spec.unwrap().containers[0];
449 assert!(c.command.is_none());
450 assert!(c.env.is_none());
451 }
452
453 #[test]
454 fn rdb_restore_overrides_command_and_sets_env() {
455 let pod = build_cache_pod(ctx(Some(
456 "http://fc:4566/_fakecloud/elasticache/_internal/rdb/p",
457 )));
458 let c = &pod.spec.unwrap().containers[0];
459 let script = c.command.as_ref().unwrap().last().unwrap();
460 assert!(script.contains("wget"), "should fetch rdb: {script}");
461 assert!(script.contains("/data/dump.rdb"));
462 assert!(script.contains("exec redis-server"));
463 assert!(script.contains("$FAKECLOUD_RDB_TOKEN"));
465 assert!(!script.contains("secret-token"));
466 let env = c.env.as_ref().unwrap();
467 assert!(env.iter().any(|e| e.name == "FAKECLOUD_RDB_URL"));
468 assert!(
469 env.iter()
470 .any(|e| e.name == "FAKECLOUD_RDB_TOKEN"
471 && e.value.as_deref() == Some("secret-token"))
472 );
473 }
474
475 #[test]
476 fn restart_policy_never() {
477 let pod = build_cache_pod(ctx(None));
478 assert_eq!(pod.spec.unwrap().restart_policy.as_deref(), Some("Never"));
479 }
480
481 #[test]
482 fn pull_secret_attached_when_set() {
483 let mut c = ctx(None);
484 c.pull_secret = Some("reg-secret");
485 let pod = build_cache_pod(c);
486 let secrets = pod.spec.unwrap().image_pull_secrets.unwrap();
487 assert_eq!(secrets[0].name, "reg-secret");
488 }
489
490 #[test]
491 fn pod_config_base_applies_to_built_pod() {
492 use std::collections::BTreeMap;
493 let mut pod = build_cache_pod(ctx(None));
497 let cfg = K8sPodConfig {
498 node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
499 annotations: BTreeMap::from([("team".to_string(), "platform".to_string())]),
500 ..Default::default()
501 };
502 cfg.apply(&mut pod);
503 let spec = pod.spec.unwrap();
504 assert_eq!(
505 spec.node_selector.unwrap().get("pool").map(String::as_str),
506 Some("cache")
507 );
508 assert_eq!(
509 pod.metadata
510 .annotations
511 .unwrap()
512 .get("team")
513 .map(String::as_str),
514 Some("platform")
515 );
516 }
517
518 #[test]
519 fn pod_config_overrides_apply_to_built_pod() {
520 use std::collections::BTreeMap;
521 let mut pod = build_cache_pod(ctx(None));
525 let base = K8sPodConfig {
526 node_selector: BTreeMap::from([("pool".to_string(), "cache".to_string())]),
527 ..Default::default()
528 };
529 let tags = BTreeMap::from([
530 (
531 "fakecloud-k8s/node-selector".to_string(),
532 "pool=spot,disktype=ssd".to_string(),
533 ),
534 (
535 "fakecloud-k8s/annotations".to_string(),
536 "team=data".to_string(),
537 ),
538 ]);
539 base.merge(K8sPodConfig::from_tags(&tags)).apply(&mut pod);
540
541 let spec = pod.spec.unwrap();
542 let sel = spec.node_selector.unwrap();
543 assert_eq!(sel.get("pool").map(String::as_str), Some("spot"));
545 assert_eq!(sel.get("disktype").map(String::as_str), Some("ssd"));
546 assert_eq!(
547 pod.metadata
548 .annotations
549 .unwrap()
550 .get("team")
551 .map(String::as_str),
552 Some("data")
553 );
554 }
555}