Skip to main content

fakecloud_rds/
runtime.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use parking_lot::RwLock;
6use tokio_postgres::NoTls;
7
8const POSTGRES_DOCKERFILE: &str = include_str!("../assets/postgres/Dockerfile");
9const AWS_COMMONS_CONTROL: &str = include_str!("../assets/postgres/aws_commons.control");
10const AWS_COMMONS_SQL: &str = include_str!("../assets/postgres/aws_commons--1.1.sql");
11const AWS_COMMONS_UPGRADE_SQL: &str = include_str!("../assets/postgres/aws_commons--1.0--1.1.sql");
12const AWS_LAMBDA_CONTROL: &str = include_str!("../assets/postgres/aws_lambda.control");
13const AWS_LAMBDA_SQL: &str = include_str!("../assets/postgres/aws_lambda--1.0.sql");
14const AWS_S3_CONTROL: &str = include_str!("../assets/postgres/aws_s3.control");
15const AWS_S3_SQL: &str = include_str!("../assets/postgres/aws_s3--1.0.sql");
16
17const MYSQL_DOCKERFILE: &str = include_str!("../assets/mysql/Dockerfile");
18const MYSQL_UDF_C: &str = include_str!("../assets/mysql/fakecloud_udf.c");
19const MYSQL_BOOTSTRAP_SH: &str = include_str!("../assets/mysql/fakecloud-bootstrap.sh");
20const MYSQL_BOOTSTRAP_SQL: &str = include_str!("../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
21
22const MARIADB_DOCKERFILE: &str = include_str!("../assets/mariadb/Dockerfile");
23const MARIADB_UDF_C: &str = include_str!("../assets/mariadb/fakecloud_udf.c");
24const MARIADB_BOOTSTRAP_SH: &str = include_str!("../assets/mariadb/fakecloud-bootstrap.sh");
25const MARIADB_BOOTSTRAP_SQL: &str =
26    include_str!("../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
27
28/// Default registry that hosts the prebuilt postgres images. CI publishes
29/// to `ghcr.io/faiscadev/fakecloud-postgres:<major>-<version>` on each
30/// release tag (see `.github/workflows/docker-rds-images.yml`).
31/// Override with the `FAKECLOUD_POSTGRES_REGISTRY` env var (e.g. for
32/// private mirrors); set `FAKECLOUD_REBUILD_POSTGRES_IMAGE=1` to force
33/// a local rebuild even when the published tag is reachable.
34const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
35
36#[derive(Debug, Clone)]
37pub struct RunningDbContainer {
38    pub container_id: String,
39    pub host_port: u16,
40}
41
42pub struct RdsRuntime {
43    cli: String,
44    containers: RwLock<HashMap<String, RunningDbContainer>>,
45    instance_id: String,
46    host_ip: String,
47    server_port: u16,
48    image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum RuntimeError {
53    #[error("container runtime is unavailable")]
54    Unavailable,
55    #[error("container failed to start: {0}")]
56    ContainerStartFailed(String),
57}
58
59impl RdsRuntime {
60    pub fn new(server_port: u16) -> Option<Self> {
61        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
62            if cli_available(&cli) {
63                cli
64            } else {
65                return None;
66            }
67        } else if cli_available("docker") {
68            "docker".to_string()
69        } else if cli_available("podman") {
70            "podman".to_string()
71        } else {
72            return None;
73        };
74
75        // Match Lambda runtime container-to-host networking: Linux uses the
76        // bridge gateway IP directly, macOS/Windows use Docker Desktop's
77        // host-gateway alias. Containers reach fakecloud at host.docker.internal.
78        let host_ip = if cfg!(target_os = "linux") {
79            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
80        } else {
81            "host-gateway".to_string()
82        };
83
84        Some(Self {
85            cli,
86            containers: RwLock::new(HashMap::new()),
87            instance_id: format!("fakecloud-{}", std::process::id()),
88            host_ip,
89            server_port,
90            image_cache: RwLock::new(HashMap::new()),
91        })
92    }
93
94    pub fn cli_name(&self) -> &str {
95        &self.cli
96    }
97
98    #[allow(clippy::too_many_arguments)]
99    pub async fn ensure_postgres(
100        &self,
101        db_instance_identifier: &str,
102        engine: &str,
103        engine_version: &str,
104        username: &str,
105        password: &str,
106        db_name: &str,
107        account_id: &str,
108        region: &str,
109    ) -> Result<RunningDbContainer, RuntimeError> {
110        self.stop_container(db_instance_identifier).await;
111
112        // Determine Docker image and port based on engine. Postgres,
113        // MySQL, and MariaDB all use prebuilt fakecloud-* images that
114        // bake in the bridge UDFs / extensions and call back into the
115        // host fakecloud server; the heavier engines (oracle/mssql/db2)
116        // stay on upstream images. `bridge_engine_version` is `Some(_)`
117        // for the bridge-aware engines and gates the `--add-host`
118        // setup below.
119        let (image, port, env_vars, bridge_engine_version) = match engine {
120            "postgres" => {
121                let major_version = engine_version.split('.').next().unwrap_or("16");
122                let image = self.ensure_postgres_image(major_version).await?;
123                let env_vars = vec![
124                    format!("POSTGRES_USER={username}"),
125                    format!("POSTGRES_PASSWORD={password}"),
126                    format!("POSTGRES_DB={db_name}"),
127                    format!(
128                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
129                        self.server_port
130                    ),
131                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
132                    format!("FAKECLOUD_REGION={region}"),
133                ];
134                (image, "5432", env_vars, Some(major_version.to_string()))
135            }
136            "mysql" => {
137                // 5.7 was dropped after Oracle community support ended
138                // (Oct 2023) — the image base no longer ships the build
139                // deps we need for the UDF. Any 5.7.* engine version
140                // resolves to 8.0.
141                let _ = engine_version;
142                let major_version = "8.0";
143                let image = self.ensure_mysql_image(major_version).await?;
144                let env_vars = vec![
145                    format!("MYSQL_ROOT_PASSWORD={password}"),
146                    format!("MYSQL_USER={username}"),
147                    format!("MYSQL_PASSWORD={password}"),
148                    format!("MYSQL_DATABASE={db_name}"),
149                    format!(
150                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
151                        self.server_port
152                    ),
153                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
154                    format!("FAKECLOUD_REGION={region}"),
155                ];
156                (image, "3306", env_vars, Some(major_version.to_string()))
157            }
158            "mariadb" => {
159                let major_version = if engine_version.starts_with("10.11") {
160                    "10.11"
161                } else if engine_version.starts_with("11.4") {
162                    "11.4"
163                } else {
164                    "10.6"
165                };
166                let image = self.ensure_mariadb_image(major_version).await?;
167                let env_vars = vec![
168                    format!("MARIADB_ROOT_PASSWORD={password}"),
169                    format!("MARIADB_USER={username}"),
170                    format!("MARIADB_PASSWORD={password}"),
171                    format!("MARIADB_DATABASE={db_name}"),
172                    format!(
173                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
174                        self.server_port
175                    ),
176                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
177                    format!("FAKECLOUD_REGION={region}"),
178                ];
179                (image, "3306", env_vars, Some(major_version.to_string()))
180            }
181            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
182                // Oracle Database Free is the no-cost dev edition shipped by
183                // Oracle. The container exposes a "FREEPDB1" pluggable
184                // database and creates the SYSTEM user with the password
185                // from ORACLE_PASSWORD.
186                let image = "gvenzl/oracle-free:23-slim".to_string();
187                let env_vars = vec![
188                    format!("ORACLE_PASSWORD={password}"),
189                    format!("APP_USER={username}"),
190                    format!("APP_USER_PASSWORD={password}"),
191                    format!("ORACLE_DATABASE={db_name}"),
192                ];
193                (image, "1521", env_vars, None)
194            }
195            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
196                // SQL Server Express is free for dev/test with no license
197                // ceiling. SA password must satisfy MSSQL's complexity
198                // requirements (>=8 chars, mix of classes); callers should
199                // supply one that does or the container will refuse to
200                // start.
201                let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
202                let env_vars = vec![
203                    "ACCEPT_EULA=Y".to_string(),
204                    format!("MSSQL_SA_PASSWORD={password}"),
205                    "MSSQL_PID=Express".to_string(),
206                ];
207                (image, "1433", env_vars, None)
208            }
209            "db2-se" | "db2-ae" => {
210                // Db2 Community Edition is free under the standard IBM
211                // Community License. The container exposes a single
212                // database named after DBNAME, owned by db2inst1.
213                let image = "icr.io/db2_community/db2:latest".to_string();
214                let env_vars = vec![
215                    "LICENSE=accept".to_string(),
216                    "DB2INSTANCE=db2inst1".to_string(),
217                    format!("DB2INST1_PASSWORD={password}"),
218                    format!("DBNAME={db_name}"),
219                ];
220                (image, "50000", env_vars, None)
221            }
222            _ => {
223                return Err(RuntimeError::ContainerStartFailed(format!(
224                    "Unsupported engine: {}",
225                    engine
226                )))
227            }
228        };
229
230        // Db2 needs --privileged to set kernel parameters during startup.
231        let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
232
233        // Build container create args
234        let mut args = vec![
235            "create".to_string(),
236            "-p".to_string(),
237            format!(":{}", port),
238            "--label".to_string(),
239            format!("fakecloud-rds={db_instance_identifier}"),
240            "--label".to_string(),
241            format!("fakecloud-instance={}", self.instance_id),
242        ];
243
244        if needs_privileged {
245            args.push("--privileged".to_string());
246        }
247
248        // Bridge-aware engines (postgres aws_lambda, mysql/mariadb
249        // fakecloud_post UDF) call back into fakecloud over HTTP. Wire
250        // the host gateway alias so the in-container code can resolve
251        // host.docker.internal on every platform.
252        if bridge_engine_version.is_some() {
253            args.push("--add-host".to_string());
254            args.push(format!("host.docker.internal:{}", self.host_ip));
255        }
256
257        for env_var in env_vars {
258            args.push("-e".to_string());
259            args.push(env_var);
260        }
261
262        args.push(image);
263
264        let output = tokio::process::Command::new(&self.cli)
265            .args(&args)
266            .output()
267            .await
268            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
269
270        if !output.status.success() {
271            return Err(RuntimeError::ContainerStartFailed(
272                String::from_utf8_lossy(&output.stderr).trim().to_string(),
273            ));
274        }
275
276        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
277        let start_result = tokio::process::Command::new(&self.cli)
278            .args(["start", &container_id])
279            .output()
280            .await
281            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
282
283        if !start_result.status.success() {
284            self.remove_container(&container_id).await;
285            return Err(RuntimeError::ContainerStartFailed(format!(
286                "container start failed: {}",
287                String::from_utf8_lossy(&start_result.stderr).trim()
288            )));
289        }
290
291        let host_port = match self.lookup_port(&container_id, port).await {
292            Ok(host_port) => host_port,
293            Err(error) => {
294                self.remove_container(&container_id).await;
295                return Err(error);
296            }
297        };
298
299        // Wait for database to be ready
300        let wait_result = match engine {
301            "postgres" => {
302                self.wait_for_postgres(username, password, db_name, host_port)
303                    .await
304            }
305            "mysql" | "mariadb" => {
306                self.wait_for_mysql(username, password, db_name, host_port)
307                    .await
308            }
309            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
310                self.wait_for_oracle(&container_id, host_port).await
311            }
312            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
313                self.wait_for_sqlserver(&container_id, host_port).await
314            }
315            "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
316            _ => unreachable!("engine already validated"),
317        };
318
319        if let Err(error) = wait_result {
320            self.remove_container(&container_id).await;
321            return Err(error);
322        }
323
324        let running = RunningDbContainer {
325            container_id,
326            host_port,
327        };
328        self.containers
329            .write()
330            .insert(db_instance_identifier.to_string(), running.clone());
331        Ok(running)
332    }
333
334    pub async fn stop_container(&self, db_instance_identifier: &str) {
335        let container = self.containers.write().remove(db_instance_identifier);
336        if let Some(container) = container {
337            self.remove_container(&container.container_id).await;
338        }
339    }
340
341    pub async fn restart_container(
342        &self,
343        db_instance_identifier: &str,
344        engine: &str,
345        username: &str,
346        password: &str,
347        db_name: &str,
348    ) -> Result<RunningDbContainer, RuntimeError> {
349        let running = self
350            .containers
351            .read()
352            .get(db_instance_identifier)
353            .cloned()
354            .ok_or(RuntimeError::Unavailable)?;
355
356        let output = tokio::process::Command::new(&self.cli)
357            .args(["restart", &running.container_id])
358            .output()
359            .await
360            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
361
362        if !output.status.success() {
363            return Err(RuntimeError::ContainerStartFailed(format!(
364                "container restart failed: {}",
365                String::from_utf8_lossy(&output.stderr).trim()
366            )));
367        }
368
369        let port = match engine {
370            "postgres" => "5432",
371            "mysql" | "mariadb" => "3306",
372            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
373            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
374            "db2-se" | "db2-ae" => "50000",
375            _ => "5432", // fallback
376        };
377
378        let host_port = self.lookup_port(&running.container_id, port).await?;
379
380        match engine {
381            "postgres" => {
382                self.wait_for_postgres(username, password, db_name, host_port)
383                    .await?
384            }
385            "mysql" | "mariadb" => {
386                self.wait_for_mysql(username, password, db_name, host_port)
387                    .await?
388            }
389            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
390                self.wait_for_oracle(&running.container_id, host_port)
391                    .await?
392            }
393            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
394                self.wait_for_sqlserver(&running.container_id, host_port)
395                    .await?
396            }
397            "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
398            _ => {
399                self.wait_for_postgres(username, password, db_name, host_port)
400                    .await?
401            }
402        };
403        let running = RunningDbContainer {
404            container_id: running.container_id,
405            host_port,
406        };
407        self.containers
408            .write()
409            .insert(db_instance_identifier.to_string(), running.clone());
410        Ok(running)
411    }
412
413    pub async fn stop_all(&self) {
414        let containers: Vec<String> = {
415            let mut containers = self.containers.write();
416            containers
417                .drain()
418                .map(|(_, container)| container.container_id)
419                .collect()
420        };
421        for container_id in containers {
422            self.remove_container(&container_id).await;
423        }
424    }
425
426    async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
427        let port_output = tokio::process::Command::new(&self.cli)
428            .args(["port", container_id, port])
429            .output()
430            .await
431            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
432
433        let port_str = String::from_utf8_lossy(&port_output.stdout);
434        port_str
435            .trim()
436            .rsplit(':')
437            .next()
438            .and_then(|value| value.parse::<u16>().ok())
439            .ok_or_else(|| {
440                RuntimeError::ContainerStartFailed(format!(
441                    "could not determine container port from '{}'",
442                    port_str.trim()
443                ))
444            })
445    }
446
447    async fn wait_for_postgres(
448        &self,
449        username: &str,
450        password: &str,
451        db_name: &str,
452        host_port: u16,
453    ) -> Result<(), RuntimeError> {
454        for _ in 0..40 {
455            tokio::time::sleep(Duration::from_millis(500)).await;
456            let connection_string = format!(
457                "host=127.0.0.1 port={host_port} user={username} password={password} dbname={db_name}"
458            );
459            if let Ok((client, connection)) =
460                tokio_postgres::connect(&connection_string, NoTls).await
461            {
462                tokio::spawn(async move {
463                    let _ = connection.await;
464                });
465                if client.simple_query("SELECT 1").await.is_ok() {
466                    return Ok(());
467                }
468            }
469        }
470
471        Err(RuntimeError::ContainerStartFailed(
472            "postgres container did not become ready within 20 seconds".to_string(),
473        ))
474    }
475
476    async fn wait_for_mysql(
477        &self,
478        username: &str,
479        password: &str,
480        db_name: &str,
481        host_port: u16,
482    ) -> Result<(), RuntimeError> {
483        use mysql_async::prelude::*;
484        use mysql_async::OptsBuilder;
485
486        for attempt in 1..=40 {
487            let opts = OptsBuilder::default()
488                .ip_or_hostname("127.0.0.1")
489                .tcp_port(host_port)
490                .user(Some(username))
491                .pass(Some(password))
492                .db_name(Some(db_name));
493
494            match mysql_async::Conn::new(opts).await {
495                Ok(mut conn) => {
496                    if conn.query_drop("SELECT 1").await.is_ok() {
497                        let _ = conn.disconnect().await;
498                        return Ok(());
499                    }
500                }
501                Err(_) => {
502                    if attempt < 40 {
503                        tokio::time::sleep(Duration::from_millis(500)).await;
504                    }
505                    continue;
506                }
507            }
508        }
509
510        Err(RuntimeError::ContainerStartFailed(
511            "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
512        ))
513    }
514
515    /// Wait for Oracle Database Free to finish bootstrapping. The
516    /// `gvenzl/oracle-free` image prints `DATABASE IS READY TO USE!`
517    /// to stdout once the listener accepts connections, so we poll
518    /// `docker logs` until that marker appears (or the deadline elapses).
519    /// Oracle XE/Free typically takes 30-90 seconds on first start.
520    async fn wait_for_oracle(
521        &self,
522        container_id: &str,
523        host_port: u16,
524    ) -> Result<(), RuntimeError> {
525        self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
526            .await?;
527        self.wait_for_tcp(host_port, 30).await
528    }
529
530    /// Wait for SQL Server to be ready. The official mssql/server image
531    /// emits `SQL Server is now ready for client connections.` once it
532    /// accepts TCP connections on 1433.
533    async fn wait_for_sqlserver(
534        &self,
535        container_id: &str,
536        host_port: u16,
537    ) -> Result<(), RuntimeError> {
538        self.wait_for_log_marker(
539            container_id,
540            "SQL Server is now ready for client connections",
541            180,
542        )
543        .await?;
544        self.wait_for_tcp(host_port, 30).await
545    }
546
547    /// Wait for Db2 Community Edition to finish setup. The
548    /// `icr.io/db2_community/db2` image prints `Setup has completed.`
549    /// once the instance is up and the database has been created.
550    async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
551        self.wait_for_log_marker(container_id, "Setup has completed", 360)
552            .await?;
553        self.wait_for_tcp(host_port, 60).await
554    }
555
556    /// Poll `docker logs <container>` until the supplied marker appears
557    /// in stdout or stderr. `deadline_secs` caps total wait.
558    async fn wait_for_log_marker(
559        &self,
560        container_id: &str,
561        marker: &str,
562        deadline_secs: u64,
563    ) -> Result<(), RuntimeError> {
564        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
565        while std::time::Instant::now() < deadline {
566            let output = tokio::process::Command::new(&self.cli)
567                .args(["logs", container_id])
568                .output()
569                .await
570                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
571            let stdout = String::from_utf8_lossy(&output.stdout);
572            let stderr = String::from_utf8_lossy(&output.stderr);
573            if stdout.contains(marker) || stderr.contains(marker) {
574                return Ok(());
575            }
576            tokio::time::sleep(Duration::from_secs(2)).await;
577        }
578        Err(RuntimeError::ContainerStartFailed(format!(
579            "container did not log '{}' within {} seconds",
580            marker, deadline_secs
581        )))
582    }
583
584    /// TCP-probe the host port until it accepts a connection or the
585    /// deadline elapses. Use after `wait_for_log_marker` since the
586    /// listener may bind a moment after the readiness log line.
587    async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
588        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
589        while std::time::Instant::now() < deadline {
590            if tokio::net::TcpStream::connect(("127.0.0.1", host_port))
591                .await
592                .is_ok()
593            {
594                return Ok(());
595            }
596            tokio::time::sleep(Duration::from_millis(500)).await;
597        }
598        Err(RuntimeError::ContainerStartFailed(format!(
599            "TCP probe to 127.0.0.1:{} did not succeed within {}s",
600            host_port, deadline_secs
601        )))
602    }
603
604    async fn remove_container(&self, container_id: &str) {
605        let _ = tokio::process::Command::new(&self.cli)
606            .args(["rm", "-f", container_id])
607            .output()
608            .await;
609    }
610
611    /// Build (or reuse) the fakecloud-postgres image for a given major
612    /// version. The image bakes plpython3u plus the aws_commons and
613    /// aws_lambda extension files so users can run
614    /// `CREATE EXTENSION aws_lambda CASCADE` inside any database.
615    /// Tag includes a content hash so changes to the embedded assets
616    /// invalidate the local cache automatically.
617    /// Resolve the postgres image tag for a given major version. Tries
618    /// (in order): in-process cache, `docker image inspect` for a copy
619    /// already on the daemon, `docker pull` of the prebuilt image
620    /// published by CI, and finally a local `docker build` from the
621    /// embedded Dockerfile + extension assets. The pull path is the
622    /// happy path for end users on tagged releases; the build path
623    /// covers dev / unreleased versions / airgapped setups.
624    ///
625    /// Honors:
626    /// - `FAKECLOUD_POSTGRES_REGISTRY` — registry prefix (default
627    ///   `ghcr.io/faiscadev`); useful for private mirrors.
628    /// - `FAKECLOUD_REBUILD_POSTGRES_IMAGE` — when set to a non-empty
629    ///   value, skip inspect + pull and force a fresh local build.
630    ///   Use after editing the embedded Dockerfile or extension SQL.
631    pub(crate) async fn ensure_postgres_image(
632        &self,
633        major_version: &str,
634    ) -> Result<String, RuntimeError> {
635        let tag = bridge_image_tag("fakecloud-postgres", major_version);
636        self.ensure_bridge_image(&tag, |tag| async move {
637            self.build_postgres_image_local(major_version, &tag).await
638        })
639        .await
640    }
641
642    async fn docker_image_exists(&self, tag: &str) -> bool {
643        tokio::process::Command::new(&self.cli)
644            .args(["image", "inspect", tag])
645            .stdout(std::process::Stdio::null())
646            .stderr(std::process::Stdio::null())
647            .status()
648            .await
649            .map(|status| status.success())
650            .unwrap_or(false)
651    }
652
653    async fn try_pull_image(&self, tag: &str) -> bool {
654        tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
655        let output = match tokio::process::Command::new(&self.cli)
656            .args(["pull", tag])
657            .output()
658            .await
659        {
660            Ok(output) => output,
661            Err(e) => {
662                tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
663                return false;
664            }
665        };
666        if output.status.success() {
667            return true;
668        }
669        tracing::info!(
670            tag = %tag,
671            stderr = %String::from_utf8_lossy(&output.stderr).trim(),
672            "Prebuilt postgres image not available, falling back to local build"
673        );
674        false
675    }
676
677    async fn build_postgres_image_local(
678        &self,
679        major_version: &str,
680        tag: &str,
681    ) -> Result<(), RuntimeError> {
682        let assets: [(&str, &str); 8] = [
683            ("Dockerfile", POSTGRES_DOCKERFILE),
684            ("aws_commons.control", AWS_COMMONS_CONTROL),
685            ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
686            ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
687            ("aws_lambda.control", AWS_LAMBDA_CONTROL),
688            ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
689            ("aws_s3.control", AWS_S3_CONTROL),
690            ("aws_s3--1.0.sql", AWS_S3_SQL),
691        ];
692        self.build_image_local(
693            tag,
694            &assets,
695            &format!("PG_VERSION={major_version}"),
696            "fakecloud-postgres",
697        )
698        .await
699    }
700
701    /// Pull-first / build-fallback for the prebuilt fakecloud-mysql
702    /// image. Mirrors `ensure_postgres_image`. The image bakes a small
703    /// libcurl-backed UDF + Aurora-compatible `mysql.lambda_async` /
704    /// `mysql.lambda_sync` stored procedures.
705    pub(crate) async fn ensure_mysql_image(
706        &self,
707        major_version: &str,
708    ) -> Result<String, RuntimeError> {
709        let tag = bridge_image_tag("fakecloud-mysql", major_version);
710        self.ensure_bridge_image(&tag, |tag| async move {
711            self.build_mysql_image_local(major_version, &tag).await
712        })
713        .await
714    }
715
716    pub(crate) async fn ensure_mariadb_image(
717        &self,
718        major_version: &str,
719    ) -> Result<String, RuntimeError> {
720        let tag = bridge_image_tag("fakecloud-mariadb", major_version);
721        self.ensure_bridge_image(&tag, |tag| async move {
722            self.build_mariadb_image_local(major_version, &tag).await
723        })
724        .await
725    }
726
727    /// Shared pull-first/build-fallback orchestration used by every
728    /// bridge-aware engine. Holds the per-tag mutex, checks the local
729    /// daemon first, then tries the prebuilt image, and finally
730    /// invokes the supplied local-build closure.
731    async fn ensure_bridge_image<F, Fut>(
732        &self,
733        tag: &str,
734        build_local: F,
735    ) -> Result<String, RuntimeError>
736    where
737        F: FnOnce(String) -> Fut,
738        Fut: std::future::Future<Output = Result<(), RuntimeError>>,
739    {
740        let lock = {
741            let mut cache = self.image_cache.write();
742            cache
743                .entry(tag.to_string())
744                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
745                .clone()
746        };
747        let mut resolved = lock.lock().await;
748        if *resolved {
749            return Ok(tag.to_string());
750        }
751
752        let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
753            .map(|v| !v.is_empty())
754            .unwrap_or(false);
755
756        if !force_rebuild {
757            if self.docker_image_exists(tag).await {
758                *resolved = true;
759                return Ok(tag.to_string());
760            }
761            if self.try_pull_image(tag).await {
762                *resolved = true;
763                return Ok(tag.to_string());
764            }
765        }
766
767        build_local(tag.to_string()).await?;
768        *resolved = true;
769        Ok(tag.to_string())
770    }
771
772    async fn build_mysql_image_local(
773        &self,
774        major_version: &str,
775        tag: &str,
776    ) -> Result<(), RuntimeError> {
777        let assets: [(&str, &str); 4] = [
778            ("Dockerfile", MYSQL_DOCKERFILE),
779            ("fakecloud_udf.c", MYSQL_UDF_C),
780            ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
781            ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
782        ];
783        self.build_image_local(
784            tag,
785            &assets,
786            &format!("MYSQL_VERSION={major_version}"),
787            "fakecloud-mysql",
788        )
789        .await
790    }
791
792    async fn build_mariadb_image_local(
793        &self,
794        major_version: &str,
795        tag: &str,
796    ) -> Result<(), RuntimeError> {
797        let assets: [(&str, &str); 4] = [
798            ("Dockerfile", MARIADB_DOCKERFILE),
799            ("fakecloud_udf.c", MARIADB_UDF_C),
800            ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
801            ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
802        ];
803        self.build_image_local(
804            tag,
805            &assets,
806            &format!("MARIADB_VERSION={major_version}"),
807            "fakecloud-mariadb",
808        )
809        .await
810    }
811
812    async fn build_image_local(
813        &self,
814        tag: &str,
815        assets: &[(&str, &str)],
816        build_arg: &str,
817        image_label: &str,
818    ) -> Result<(), RuntimeError> {
819        let build_dir =
820            tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
821        for (name, contents) in assets {
822            tokio::fs::write(build_dir.path().join(name), contents)
823                .await
824                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
825        }
826
827        tracing::info!(
828            tag = %tag,
829            image = %image_label,
830            "Building {image_label} image locally (first use can take ~60s)"
831        );
832
833        let output = tokio::process::Command::new(&self.cli)
834            .args(["build", "--build-arg", build_arg, "-t", tag, "."])
835            .current_dir(build_dir.path())
836            .output()
837            .await
838            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
839
840        if !output.status.success() {
841            return Err(RuntimeError::ContainerStartFailed(format!(
842                "docker build for {} failed: {}",
843                tag,
844                String::from_utf8_lossy(&output.stderr).trim()
845            )));
846        }
847
848        Ok(())
849    }
850
851    pub async fn dump_database(
852        &self,
853        db_instance_identifier: &str,
854        engine: &str,
855        username: &str,
856        password: &str,
857        db_name: &str,
858    ) -> Result<Vec<u8>, RuntimeError> {
859        let container = self
860            .containers
861            .read()
862            .get(db_instance_identifier)
863            .cloned()
864            .ok_or(RuntimeError::Unavailable)?;
865
866        let args: Vec<String> = match engine {
867            "mysql" | "mariadb" => vec![
868                "exec".into(),
869                container.container_id.clone(),
870                "mysqldump".into(),
871                "-u".into(),
872                username.into(),
873                format!("-p{password}"),
874                db_name.into(),
875            ],
876            "postgres" => vec![
877                "exec".into(),
878                container.container_id.clone(),
879                "pg_dump".into(),
880                "-U".into(),
881                username.into(),
882                "-d".into(),
883                db_name.into(),
884                "--no-password".into(),
885            ],
886            // Heavy engines don't ship with a portable dump CLI we can
887            // shell out to from the host the same way pg_dump and
888            // mysqldump are guaranteed available. Surface a clear
889            // error so callers (snapshot/read-replica) don't silently
890            // run the wrong tool against an Oracle/SQL Server/Db2
891            // container.
892            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
893            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
894                return Err(RuntimeError::ContainerStartFailed(format!(
895                    "engine {engine} is not yet supported by the snapshot/read-replica path; \
896                     emulator stores the API state but cannot dump the underlying database"
897                )));
898            }
899            other => {
900                return Err(RuntimeError::ContainerStartFailed(format!(
901                    "engine {other} is not supported by dump_database"
902                )));
903            }
904        };
905
906        let output = tokio::process::Command::new(&self.cli)
907            .args(&args)
908            .output()
909            .await
910            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
911
912        if !output.status.success() {
913            return Err(RuntimeError::ContainerStartFailed(format!(
914                "dump failed: {}",
915                String::from_utf8_lossy(&output.stderr).trim()
916            )));
917        }
918
919        Ok(output.stdout)
920    }
921
922    /// Cat a file inside the running container by absolute path.
923    /// Returns `Unavailable` when the runtime/daemon is missing or the
924    /// instance has no live container; returns `ContainerStartFailed`
925    /// when the file is missing or unreadable. Callers map those to the
926    /// AWS-shaped responses.
927    pub async fn read_log_file(
928        &self,
929        db_instance_identifier: &str,
930        container_path: &str,
931    ) -> Result<Vec<u8>, RuntimeError> {
932        let container = self
933            .containers
934            .read()
935            .get(db_instance_identifier)
936            .cloned()
937            .ok_or(RuntimeError::Unavailable)?;
938
939        let output = tokio::process::Command::new(&self.cli)
940            .args(["exec", &container.container_id, "cat", container_path])
941            .output()
942            .await
943            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
944
945        if !output.status.success() {
946            return Err(RuntimeError::ContainerStartFailed(format!(
947                "cat {container_path} failed: {}",
948                String::from_utf8_lossy(&output.stderr).trim()
949            )));
950        }
951
952        Ok(output.stdout)
953    }
954
955    pub async fn restore_database(
956        &self,
957        db_instance_identifier: &str,
958        engine: &str,
959        username: &str,
960        password: &str,
961        db_name: &str,
962        dump_data: &[u8],
963    ) -> Result<(), RuntimeError> {
964        let container = self
965            .containers
966            .read()
967            .get(db_instance_identifier)
968            .cloned()
969            .ok_or(RuntimeError::Unavailable)?;
970
971        let args: Vec<String> = match engine {
972            "mysql" | "mariadb" => vec![
973                "exec".into(),
974                "-i".into(),
975                container.container_id.clone(),
976                "mysql".into(),
977                "-u".into(),
978                username.into(),
979                format!("-p{password}"),
980                db_name.into(),
981            ],
982            "postgres" => vec![
983                "exec".into(),
984                "-i".into(),
985                container.container_id.clone(),
986                "psql".into(),
987                "-U".into(),
988                username.into(),
989                "-d".into(),
990                db_name.into(),
991                "--no-password".into(),
992                "-v".into(),
993                "ON_ERROR_STOP=1".into(),
994            ],
995            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
996            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
997                return Err(RuntimeError::ContainerStartFailed(format!(
998                    "engine {engine} is not yet supported by the snapshot-restore path"
999                )));
1000            }
1001            other => {
1002                return Err(RuntimeError::ContainerStartFailed(format!(
1003                    "engine {other} is not supported by restore_database"
1004                )));
1005            }
1006        };
1007
1008        let mut child = tokio::process::Command::new(&self.cli)
1009            .args(&args)
1010            .stdin(std::process::Stdio::piped())
1011            .stdout(std::process::Stdio::piped())
1012            .stderr(std::process::Stdio::piped())
1013            .spawn()
1014            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1015
1016        if let Some(mut stdin) = child.stdin.take() {
1017            use tokio::io::AsyncWriteExt;
1018            stdin
1019                .write_all(dump_data)
1020                .await
1021                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1022            drop(stdin);
1023        }
1024
1025        let output = child
1026            .wait_with_output()
1027            .await
1028            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1029
1030        if !output.status.success() {
1031            return Err(RuntimeError::ContainerStartFailed(format!(
1032                "restore failed: {}",
1033                String::from_utf8_lossy(&output.stderr).trim()
1034            )));
1035        }
1036
1037        Ok(())
1038    }
1039}
1040
1041fn cli_available(cli: &str) -> bool {
1042    std::process::Command::new(cli)
1043        .arg("info")
1044        .stdout(std::process::Stdio::null())
1045        .stderr(std::process::Stdio::null())
1046        .status()
1047        .map(|status| status.success())
1048        .unwrap_or(false)
1049}
1050
1051/// On Linux Docker bridge networks, ask the daemon for the bridge
1052/// gateway IP so containers can reach the host's loopback. macOS and
1053/// Windows use the magic `host-gateway` alias instead.
1054fn detect_bridge_gateway(cli: &str) -> Option<String> {
1055    let output = std::process::Command::new(cli)
1056        .args([
1057            "network",
1058            "inspect",
1059            "bridge",
1060            "--format",
1061            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
1062        ])
1063        .output()
1064        .ok()?;
1065    if !output.status.success() {
1066        return None;
1067    }
1068    let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
1069    if gateway.is_empty() || !gateway.contains('.') {
1070        return None;
1071    }
1072    Some(gateway)
1073}
1074
1075/// Build the prebuilt-image reference for a given engine + major
1076/// version. Uses `<registry>/<image>:<major>-<fakecloud-version>`,
1077/// where the registry comes from `FAKECLOUD_POSTGRES_REGISTRY` (kept
1078/// historical name; defaults to the public `ghcr.io/faiscadev`).
1079/// The version pin guarantees the runtime asks the daemon for the
1080/// same image CI publishes for this fakecloud release; mismatched
1081/// assets force a local rebuild via the fall-through in
1082/// `ensure_bridge_image`.
1083fn bridge_image_tag(image: &str, major_version: &str) -> String {
1084    let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1085        .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1086    let registry = registry.trim_end_matches('/');
1087    format!(
1088        "{}/{}:{}-{}",
1089        registry,
1090        image,
1091        major_version,
1092        env!("CARGO_PKG_VERSION")
1093    )
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use super::*;
1099
1100    /// Single test (rather than three) so the cases run sequentially —
1101    /// `bridge_image_tag` reads a process-global env var and parallel
1102    /// `cargo test` workers would race over it otherwise.
1103    #[test]
1104    fn bridge_image_tag_resolves_registry_overrides() {
1105        let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
1106
1107        std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
1108        assert_eq!(
1109            bridge_image_tag("fakecloud-postgres", "16"),
1110            format!(
1111                "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1112                env!("CARGO_PKG_VERSION")
1113            )
1114        );
1115        assert_eq!(
1116            bridge_image_tag("fakecloud-mysql", "8.0"),
1117            format!(
1118                "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1119                env!("CARGO_PKG_VERSION")
1120            )
1121        );
1122        assert_eq!(
1123            bridge_image_tag("fakecloud-mariadb", "10.11"),
1124            format!(
1125                "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1126                env!("CARGO_PKG_VERSION")
1127            )
1128        );
1129
1130        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
1131        assert_eq!(
1132            bridge_image_tag("fakecloud-postgres", "15"),
1133            format!(
1134                "registry.example.com/team/fakecloud-postgres:15-{}",
1135                env!("CARGO_PKG_VERSION")
1136            )
1137        );
1138
1139        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
1140        assert_eq!(
1141            bridge_image_tag("fakecloud-postgres", "13"),
1142            format!(
1143                "registry.example.com/team/fakecloud-postgres:13-{}",
1144                env!("CARGO_PKG_VERSION")
1145            )
1146        );
1147
1148        match prev {
1149            Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
1150            None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
1151        }
1152    }
1153}