fakecloud_elasticache/
runtime.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5
6#[derive(Debug, Clone)]
7pub struct RunningCacheContainer {
8 pub container_id: String,
9 pub host_port: u16,
10}
11
12pub struct ElastiCacheRuntime {
13 cli: String,
14 containers: RwLock<HashMap<String, RunningCacheContainer>>,
15 instance_id: String,
16}
17
18#[derive(Debug, thiserror::Error)]
19pub enum RuntimeError {
20 #[error("container runtime is unavailable")]
21 Unavailable,
22 #[error("container failed to start: {0}")]
23 ContainerStartFailed(String),
24}
25
26impl ElastiCacheRuntime {
27 pub fn new() -> Option<Self> {
28 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
29 if cli_available(&cli) {
30 cli
31 } else {
32 return None;
33 }
34 } else if cli_available("docker") {
35 "docker".to_string()
36 } else if cli_available("podman") {
37 "podman".to_string()
38 } else {
39 return None;
40 };
41
42 Some(Self {
43 cli,
44 containers: RwLock::new(HashMap::new()),
45 instance_id: format!("fakecloud-{}", std::process::id()),
46 })
47 }
48
49 pub fn cli_name(&self) -> &str {
50 &self.cli
51 }
52
53 pub async fn ensure_redis(
54 &self,
55 resource_id: &str,
56 ) -> Result<RunningCacheContainer, RuntimeError> {
57 self.stop_container(resource_id).await;
58
59 let output = tokio::process::Command::new(&self.cli)
60 .args([
61 "create",
62 "-p",
63 ":6379",
64 "--label",
65 &format!("fakecloud-elasticache={resource_id}"),
66 "--label",
67 &format!("fakecloud-instance={}", self.instance_id),
68 "redis:7-alpine",
69 ])
70 .output()
71 .await
72 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
73
74 if !output.status.success() {
75 return Err(RuntimeError::ContainerStartFailed(
76 String::from_utf8_lossy(&output.stderr).trim().to_string(),
77 ));
78 }
79
80 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
81 let start_result = tokio::process::Command::new(&self.cli)
82 .args(["start", &container_id])
83 .output()
84 .await
85 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
86
87 if !start_result.status.success() {
88 self.remove_container(&container_id).await;
89 return Err(RuntimeError::ContainerStartFailed(format!(
90 "container start failed: {}",
91 String::from_utf8_lossy(&start_result.stderr).trim()
92 )));
93 }
94
95 let host_port = match self.lookup_port(&container_id, 6379).await {
96 Ok(host_port) => host_port,
97 Err(error) => {
98 self.remove_container(&container_id).await;
99 return Err(error);
100 }
101 };
102
103 if let Err(error) = self.wait_for_redis(host_port).await {
104 self.remove_container(&container_id).await;
105 return Err(error);
106 }
107
108 let running = RunningCacheContainer {
109 container_id,
110 host_port,
111 };
112 self.containers
113 .write()
114 .insert(resource_id.to_string(), running.clone());
115 Ok(running)
116 }
117
118 pub async fn stop_container(&self, resource_id: &str) {
119 let container = self.containers.write().remove(resource_id);
120 if let Some(container) = container {
121 self.remove_container(&container.container_id).await;
122 }
123 }
124
125 pub async fn stop_all(&self) {
126 let containers: Vec<String> = {
127 let mut containers = self.containers.write();
128 containers
129 .drain()
130 .map(|(_, container)| container.container_id)
131 .collect()
132 };
133 for container_id in containers {
134 self.remove_container(&container_id).await;
135 }
136 }
137
138 async fn lookup_port(
139 &self,
140 container_id: &str,
141 container_port: u16,
142 ) -> Result<u16, RuntimeError> {
143 let port_output = tokio::process::Command::new(&self.cli)
144 .args(["port", container_id, &container_port.to_string()])
145 .output()
146 .await
147 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
148
149 if !port_output.status.success() {
150 let stderr = String::from_utf8_lossy(&port_output.stderr);
151 return Err(RuntimeError::ContainerStartFailed(format!(
152 "port lookup failed: {stderr}"
153 )));
154 }
155
156 let port_str = String::from_utf8_lossy(&port_output.stdout);
157 port_str
158 .trim()
159 .rsplit(':')
160 .next()
161 .and_then(|value| value.parse::<u16>().ok())
162 .ok_or_else(|| {
163 RuntimeError::ContainerStartFailed(format!(
164 "could not determine redis port from '{}'",
165 port_str.trim()
166 ))
167 })
168 }
169
170 async fn wait_for_redis(&self, host_port: u16) -> Result<(), RuntimeError> {
171 for _ in 0..40 {
172 tokio::time::sleep(Duration::from_millis(500)).await;
173 if tokio::net::TcpStream::connect(format!("127.0.0.1:{host_port}"))
174 .await
175 .is_ok()
176 {
177 return Ok(());
178 }
179 }
180
181 Err(RuntimeError::ContainerStartFailed(
182 "redis container did not become ready within 20 seconds".to_string(),
183 ))
184 }
185
186 async fn remove_container(&self, container_id: &str) {
187 let _ = tokio::process::Command::new(&self.cli)
188 .args(["rm", "-f", container_id])
189 .output()
190 .await;
191 }
192}
193
194fn cli_available(cli: &str) -> bool {
195 std::process::Command::new(cli)
196 .arg("info")
197 .stdout(std::process::Stdio::null())
198 .stderr(std::process::Stdio::null())
199 .status()
200 .map(|status| status.success())
201 .unwrap_or(false)
202}