Skip to main content

fakecloud_elasticache/
runtime.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5
6#[derive(Debug, Clone)]
7pub struct RunningCacheContainer {
8    pub container_id: String,
9    pub host_port: u16,
10}
11
12pub struct ElastiCacheRuntime {
13    cli: String,
14    containers: RwLock<HashMap<String, RunningCacheContainer>>,
15    instance_id: String,
16}
17
18#[derive(Debug, thiserror::Error)]
19pub enum RuntimeError {
20    #[error("container runtime is unavailable")]
21    Unavailable,
22    #[error("container failed to start: {0}")]
23    ContainerStartFailed(String),
24}
25
26impl ElastiCacheRuntime {
27    pub fn new() -> Option<Self> {
28        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
29            if cli_available(&cli) {
30                cli
31            } else {
32                return None;
33            }
34        } else if cli_available("docker") {
35            "docker".to_string()
36        } else if cli_available("podman") {
37            "podman".to_string()
38        } else {
39            return None;
40        };
41
42        Some(Self {
43            cli,
44            containers: RwLock::new(HashMap::new()),
45            instance_id: format!("fakecloud-{}", std::process::id()),
46        })
47    }
48
49    pub fn cli_name(&self) -> &str {
50        &self.cli
51    }
52
53    pub async fn ensure_redis(
54        &self,
55        resource_id: &str,
56    ) -> Result<RunningCacheContainer, RuntimeError> {
57        self.stop_container(resource_id).await;
58
59        let output = tokio::process::Command::new(&self.cli)
60            .args([
61                "create",
62                "-p",
63                ":6379",
64                "--label",
65                &format!("fakecloud-elasticache={resource_id}"),
66                "--label",
67                &format!("fakecloud-instance={}", self.instance_id),
68                "redis:7-alpine",
69            ])
70            .output()
71            .await
72            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
73
74        if !output.status.success() {
75            return Err(RuntimeError::ContainerStartFailed(
76                String::from_utf8_lossy(&output.stderr).trim().to_string(),
77            ));
78        }
79
80        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
81        let start_result = tokio::process::Command::new(&self.cli)
82            .args(["start", &container_id])
83            .output()
84            .await
85            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
86
87        if !start_result.status.success() {
88            self.remove_container(&container_id).await;
89            return Err(RuntimeError::ContainerStartFailed(format!(
90                "container start failed: {}",
91                String::from_utf8_lossy(&start_result.stderr).trim()
92            )));
93        }
94
95        let host_port = match self.lookup_port(&container_id, 6379).await {
96            Ok(host_port) => host_port,
97            Err(error) => {
98                self.remove_container(&container_id).await;
99                return Err(error);
100            }
101        };
102
103        if let Err(error) = self.wait_for_redis(host_port).await {
104            self.remove_container(&container_id).await;
105            return Err(error);
106        }
107
108        let running = RunningCacheContainer {
109            container_id,
110            host_port,
111        };
112        self.containers
113            .write()
114            .insert(resource_id.to_string(), running.clone());
115        Ok(running)
116    }
117
118    pub async fn stop_container(&self, resource_id: &str) {
119        let container = self.containers.write().remove(resource_id);
120        if let Some(container) = container {
121            self.remove_container(&container.container_id).await;
122        }
123    }
124
125    pub async fn stop_all(&self) {
126        let containers: Vec<String> = {
127            let mut containers = self.containers.write();
128            containers
129                .drain()
130                .map(|(_, container)| container.container_id)
131                .collect()
132        };
133        for container_id in containers {
134            self.remove_container(&container_id).await;
135        }
136    }
137
138    async fn lookup_port(
139        &self,
140        container_id: &str,
141        container_port: u16,
142    ) -> Result<u16, RuntimeError> {
143        let port_output = tokio::process::Command::new(&self.cli)
144            .args(["port", container_id, &container_port.to_string()])
145            .output()
146            .await
147            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
148
149        if !port_output.status.success() {
150            let stderr = String::from_utf8_lossy(&port_output.stderr);
151            return Err(RuntimeError::ContainerStartFailed(format!(
152                "port lookup failed: {stderr}"
153            )));
154        }
155
156        let port_str = String::from_utf8_lossy(&port_output.stdout);
157        port_str
158            .trim()
159            .rsplit(':')
160            .next()
161            .and_then(|value| value.parse::<u16>().ok())
162            .ok_or_else(|| {
163                RuntimeError::ContainerStartFailed(format!(
164                    "could not determine redis port from '{}'",
165                    port_str.trim()
166                ))
167            })
168    }
169
170    async fn wait_for_redis(&self, host_port: u16) -> Result<(), RuntimeError> {
171        for _ in 0..40 {
172            tokio::time::sleep(Duration::from_millis(500)).await;
173            if tokio::net::TcpStream::connect(format!("127.0.0.1:{host_port}"))
174                .await
175                .is_ok()
176            {
177                return Ok(());
178            }
179        }
180
181        Err(RuntimeError::ContainerStartFailed(
182            "redis container did not become ready within 20 seconds".to_string(),
183        ))
184    }
185
186    async fn remove_container(&self, container_id: &str) {
187        let _ = tokio::process::Command::new(&self.cli)
188            .args(["rm", "-f", container_id])
189            .output()
190            .await;
191    }
192}
193
194fn cli_available(cli: &str) -> bool {
195    std::process::Command::new(cli)
196        .arg("info")
197        .stdout(std::process::Stdio::null())
198        .stderr(std::process::Stdio::null())
199        .status()
200        .map(|status| status.success())
201        .unwrap_or(false)
202}