Skip to main content

fakecloud_rds/runtime/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fakecloud_core::container_net::{detect_container_cli, HostNetworking};
6use parking_lot::RwLock;
7use tokio_postgres::NoTls;
8
9mod k8s;
10
11const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
12const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
13const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
14const AWS_COMMONS_UPGRADE_SQL: &str =
15    include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
16const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
17const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
18const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
19const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
20
21const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
22const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
23const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
24const MYSQL_BOOTSTRAP_SQL: &str =
25    include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
26
27const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
28const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
29const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
30const MARIADB_BOOTSTRAP_SQL: &str =
31    include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
32
33/// Default registry that hosts the prebuilt postgres images. CI publishes
34/// to `ghcr.io/faiscadev/fakecloud-postgres:<major>-<version>` on each
35/// release tag (see `.github/workflows/docker-rds-images.yml`).
36/// Override with the `FAKECLOUD_POSTGRES_REGISTRY` env var (e.g. for
37/// private mirrors); set `FAKECLOUD_REBUILD_POSTGRES_IMAGE=1` to force
38/// a local rebuild even when the published tag is reachable.
39const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
40
41#[derive(Debug, Clone)]
42pub struct RunningDbContainer {
43    /// Backend-specific handle: a Docker container id, or a Pod name.
44    pub container_id: String,
45    /// Host-published port (Docker) or the engine's in-Pod port (k8s).
46    pub host_port: u16,
47    /// Address clients (and fakecloud's own readiness/restore) connect to:
48    /// `127.0.0.1` for Docker, the Pod IP for k8s.
49    pub endpoint_address: String,
50    /// Port clients connect to: the published host port for Docker, the
51    /// engine's standard port for k8s.
52    pub endpoint_port: u16,
53}
54
55pub struct RdsRuntime {
56    cli: String,
57    containers: RwLock<HashMap<String, RunningDbContainer>>,
58    instance_id: String,
59    /// Container-to-host networking resolved from the shared
60    /// `fakecloud-core::container_net` helper (issue #1539): the host alias
61    /// the spawned DB container uses to reach fakecloud, the `--add-host`
62    /// flag (None under podman), and the sibling address fakecloud uses to
63    /// reach the DB it just spawned. Unused on the k8s backend.
64    net: HostNetworking,
65    server_port: u16,
66    image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
67    /// `Some` when running on the Kubernetes backend; the public methods
68    /// dispatch to it and the Docker fields above go unused. `None` is the
69    /// default Docker/Podman backend.
70    k8s: Option<k8s::K8sDb>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum RuntimeError {
75    #[error("container runtime is unavailable")]
76    Unavailable,
77    #[error("container failed to start: {0}")]
78    ContainerStartFailed(String),
79}
80
81/// Error initializing the Kubernetes backend at startup. Surfaced to the
82/// operator so a misconfigured cluster fails fast rather than silently
83/// falling back to Docker.
84#[derive(Debug, thiserror::Error)]
85pub enum BackendInitError {
86    #[error(transparent)]
87    Env(#[from] fakecloud_k8s::K8sEnvError),
88    #[error("failed to connect to the Kubernetes cluster: {0}")]
89    Connect(String),
90}
91
92impl RdsRuntime {
93    pub fn new(server_port: u16) -> Option<Self> {
94        // CLI detection, host alias, `--add-host` injection, and the
95        // in-container sibling address all come from the shared
96        // `container_net` helper so RDS can't drift from Lambda/ECS/
97        // ElastiCache on the issue #1539 fix (podman gets no `--add-host`
98        // and the `host.containers.internal` alias; macOS docker gets
99        // `host-gateway`; Linux docker gets the resolved bridge IP).
100        let cli = detect_container_cli()?;
101        let net = HostNetworking::detect(&cli);
102
103        Some(Self {
104            cli,
105            containers: RwLock::new(HashMap::new()),
106            instance_id: format!("fakecloud-{}", std::process::id()),
107            net,
108            server_port,
109            image_cache: RwLock::new(HashMap::new()),
110            k8s: None,
111        })
112    }
113
114    /// Construct the Kubernetes backend. `server_port` is fakecloud's
115    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
116    /// fast on misconfiguration — never silently degrades to Docker.
117    pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
118        let db = k8s::K8sDb::from_env(server_port).await?;
119        Ok(Self {
120            cli: String::new(),
121            containers: RwLock::new(HashMap::new()),
122            instance_id: format!("fakecloud-{}", std::process::id()),
123            // Docker networking is unused on the k8s backend (Pod addresses
124            // come from `K8sDb`); a host-loopback default keeps the field
125            // populated without affecting behavior.
126            net: HostNetworking {
127                host_alias: String::new(),
128                add_host_arg: None,
129                sibling_host: "127.0.0.1".to_string(),
130            },
131            server_port,
132            image_cache: RwLock::new(HashMap::new()),
133            k8s: Some(db),
134        })
135    }
136
137    pub fn cli_name(&self) -> &str {
138        if self.k8s.is_some() {
139            "kubernetes"
140        } else {
141            &self.cli
142        }
143    }
144
145    /// Sweep DB Pods orphaned by a previous process (k8s only; no-op on
146    /// the Docker backend, which the shared container reaper handles).
147    pub async fn reap_stale(&self) {
148        if let Some(k) = &self.k8s {
149            k.reap_stale().await;
150        }
151    }
152
153    #[allow(clippy::too_many_arguments)]
154    pub async fn ensure_postgres(
155        &self,
156        db_instance_identifier: &str,
157        engine: &str,
158        engine_version: &str,
159        username: &str,
160        password: &str,
161        db_name: &str,
162        account_id: &str,
163        region: &str,
164    ) -> Result<RunningDbContainer, RuntimeError> {
165        if let Some(k) = &self.k8s {
166            let running = k
167                .ensure(
168                    db_instance_identifier,
169                    engine,
170                    engine_version,
171                    username,
172                    password,
173                    db_name,
174                    account_id,
175                    region,
176                )
177                .await?;
178            self.containers
179                .write()
180                .insert(db_instance_identifier.to_string(), running.clone());
181            return Ok(running);
182        }
183        self.stop_container(db_instance_identifier).await;
184
185        // Determine Docker image and port based on engine. Postgres,
186        // MySQL, and MariaDB all use prebuilt fakecloud-* images that
187        // bake in the bridge UDFs / extensions and call back into the
188        // host fakecloud server; the heavier engines (oracle/mssql/db2)
189        // stay on upstream images. `bridge_engine_version` is `Some(_)`
190        // for the bridge-aware engines and gates the `--add-host`
191        // setup below.
192        let (image, port, env_vars, bridge_engine_version) = match engine {
193            "postgres" => {
194                let major_version = engine_version.split('.').next().unwrap_or("16");
195                let image = self.ensure_postgres_image(major_version).await?;
196                let env_vars = vec![
197                    format!("POSTGRES_USER={username}"),
198                    format!("POSTGRES_PASSWORD={password}"),
199                    format!("POSTGRES_DB={db_name}"),
200                    format!(
201                        "FAKECLOUD_ENDPOINT=http://{}:{}",
202                        self.net.host_alias, self.server_port
203                    ),
204                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
205                    format!("FAKECLOUD_REGION={region}"),
206                ];
207                (image, "5432", env_vars, Some(major_version.to_string()))
208            }
209            "mysql" => {
210                // 5.7 was dropped after Oracle community support ended
211                // (Oct 2023) — the image base no longer ships the build
212                // deps we need for the UDF. Any 5.7.* engine version
213                // resolves to 8.0.
214                let _ = engine_version;
215                let major_version = "8.0";
216                let image = self.ensure_mysql_image(major_version).await?;
217                let env_vars = vec![
218                    format!("MYSQL_ROOT_PASSWORD={password}"),
219                    format!("MYSQL_USER={username}"),
220                    format!("MYSQL_PASSWORD={password}"),
221                    format!("MYSQL_DATABASE={db_name}"),
222                    format!(
223                        "FAKECLOUD_ENDPOINT=http://{}:{}",
224                        self.net.host_alias, self.server_port
225                    ),
226                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
227                    format!("FAKECLOUD_REGION={region}"),
228                ];
229                (image, "3306", env_vars, Some(major_version.to_string()))
230            }
231            "mariadb" => {
232                let major_version = if engine_version.starts_with("10.11") {
233                    "10.11"
234                } else if engine_version.starts_with("11.4") {
235                    "11.4"
236                } else {
237                    "10.6"
238                };
239                let image = self.ensure_mariadb_image(major_version).await?;
240                let env_vars = vec![
241                    format!("MARIADB_ROOT_PASSWORD={password}"),
242                    format!("MARIADB_USER={username}"),
243                    format!("MARIADB_PASSWORD={password}"),
244                    format!("MARIADB_DATABASE={db_name}"),
245                    format!(
246                        "FAKECLOUD_ENDPOINT=http://{}:{}",
247                        self.net.host_alias, self.server_port
248                    ),
249                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
250                    format!("FAKECLOUD_REGION={region}"),
251                ];
252                (image, "3306", env_vars, Some(major_version.to_string()))
253            }
254            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
255                // Oracle Database Free is the no-cost dev edition shipped by
256                // Oracle. The container exposes a "FREEPDB1" pluggable
257                // database and creates the SYSTEM user with the password
258                // from ORACLE_PASSWORD.
259                let image = "gvenzl/oracle-free:23-slim".to_string();
260                let env_vars = vec![
261                    format!("ORACLE_PASSWORD={password}"),
262                    format!("APP_USER={username}"),
263                    format!("APP_USER_PASSWORD={password}"),
264                    format!("ORACLE_DATABASE={db_name}"),
265                ];
266                (image, "1521", env_vars, None)
267            }
268            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
269                // SQL Server Express is free for dev/test with no license
270                // ceiling. SA password must satisfy MSSQL's complexity
271                // requirements (>=8 chars, mix of classes); callers should
272                // supply one that does or the container will refuse to
273                // start.
274                let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
275                let env_vars = vec![
276                    "ACCEPT_EULA=Y".to_string(),
277                    format!("MSSQL_SA_PASSWORD={password}"),
278                    "MSSQL_PID=Express".to_string(),
279                ];
280                (image, "1433", env_vars, None)
281            }
282            "db2-se" | "db2-ae" => {
283                // Db2 Community Edition is free under the standard IBM
284                // Community License. The container exposes a single
285                // database named after DBNAME, owned by db2inst1.
286                let image = "icr.io/db2_community/db2:latest".to_string();
287                let env_vars = vec![
288                    "LICENSE=accept".to_string(),
289                    "DB2INSTANCE=db2inst1".to_string(),
290                    format!("DB2INST1_PASSWORD={password}"),
291                    format!("DBNAME={db_name}"),
292                ];
293                (image, "50000", env_vars, None)
294            }
295            _ => {
296                return Err(RuntimeError::ContainerStartFailed(format!(
297                    "Unsupported engine: {}",
298                    engine
299                )))
300            }
301        };
302
303        // Db2 needs --privileged to set kernel parameters during startup.
304        let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
305
306        // Build container create args
307        let mut args = vec![
308            "create".to_string(),
309            "-p".to_string(),
310            format!(":{}", port),
311            "--label".to_string(),
312            format!("fakecloud-rds={db_instance_identifier}"),
313            "--label".to_string(),
314            format!("fakecloud-instance={}", self.instance_id),
315        ];
316
317        if needs_privileged {
318            args.push("--privileged".to_string());
319        }
320
321        // Bridge-aware engines (postgres aws_lambda, mysql/mariadb
322        // fakecloud_post UDF) call back into fakecloud over HTTP. Inject the
323        // host-gateway `--add-host` mapping so the in-container code can
324        // resolve the host alias. No-op under podman, which provides
325        // `host.containers.internal` natively — passing `host-gateway` there
326        // fails with "host containers internal IP address is empty" (#1539).
327        if bridge_engine_version.is_some() {
328            self.net.push_add_host_args(&mut args);
329        }
330
331        for env_var in env_vars {
332            args.push("-e".to_string());
333            args.push(env_var);
334        }
335
336        args.push(image);
337
338        let output = tokio::process::Command::new(&self.cli)
339            .args(&args)
340            .output()
341            .await
342            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
343
344        if !output.status.success() {
345            return Err(RuntimeError::ContainerStartFailed(
346                String::from_utf8_lossy(&output.stderr).trim().to_string(),
347            ));
348        }
349
350        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
351        let start_result = tokio::process::Command::new(&self.cli)
352            .args(["start", &container_id])
353            .output()
354            .await
355            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
356
357        if !start_result.status.success() {
358            self.remove_container(&container_id).await;
359            return Err(RuntimeError::ContainerStartFailed(format!(
360                "container start failed: {}",
361                String::from_utf8_lossy(&start_result.stderr).trim()
362            )));
363        }
364
365        let host_port = match self.lookup_port(&container_id, port).await {
366            Ok(host_port) => host_port,
367            Err(error) => {
368                self.remove_container(&container_id).await;
369                return Err(error);
370            }
371        };
372
373        // Wait for database to be ready
374        let wait_result = match engine {
375            "postgres" => {
376                self.wait_for_postgres(username, password, db_name, host_port)
377                    .await
378            }
379            "mysql" | "mariadb" => {
380                self.wait_for_mysql(username, password, db_name, host_port)
381                    .await
382            }
383            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
384                self.wait_for_oracle(&container_id, host_port).await
385            }
386            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
387                self.wait_for_sqlserver(&container_id, host_port).await
388            }
389            "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
390            _ => unreachable!("engine already validated"),
391        };
392
393        if let Err(error) = wait_result {
394            self.remove_container(&container_id).await;
395            return Err(error);
396        }
397
398        let running = RunningDbContainer {
399            container_id,
400            host_port,
401            // 127.0.0.1 on the host; host.docker.internal /
402            // host.containers.internal when fakecloud is itself
403            // containerized (issue #1539).
404            endpoint_address: self.net.sibling_host.clone(),
405            endpoint_port: host_port,
406        };
407        self.containers
408            .write()
409            .insert(db_instance_identifier.to_string(), running.clone());
410        Ok(running)
411    }
412
413    pub async fn stop_container(&self, db_instance_identifier: &str) {
414        let container = self.containers.write().remove(db_instance_identifier);
415        if let Some(container) = container {
416            if let Some(k) = &self.k8s {
417                k.delete_pod(&container.container_id).await;
418            } else {
419                self.remove_container(&container.container_id).await;
420            }
421        }
422    }
423
424    pub async fn restart_container(
425        &self,
426        db_instance_identifier: &str,
427        engine: &str,
428        username: &str,
429        password: &str,
430        db_name: &str,
431    ) -> Result<RunningDbContainer, RuntimeError> {
432        if let Some(k) = &self.k8s {
433            let running = k
434                .restart(db_instance_identifier, engine, username, password, db_name)
435                .await?;
436            self.containers
437                .write()
438                .insert(db_instance_identifier.to_string(), running.clone());
439            return Ok(running);
440        }
441        let running = self
442            .containers
443            .read()
444            .get(db_instance_identifier)
445            .cloned()
446            .ok_or(RuntimeError::Unavailable)?;
447
448        let output = tokio::process::Command::new(&self.cli)
449            .args(["restart", &running.container_id])
450            .output()
451            .await
452            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
453
454        if !output.status.success() {
455            return Err(RuntimeError::ContainerStartFailed(format!(
456                "container restart failed: {}",
457                String::from_utf8_lossy(&output.stderr).trim()
458            )));
459        }
460
461        let port = match engine {
462            "postgres" => "5432",
463            "mysql" | "mariadb" => "3306",
464            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
465            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
466            "db2-se" | "db2-ae" => "50000",
467            _ => "5432", // fallback
468        };
469
470        let host_port = self.lookup_port(&running.container_id, port).await?;
471
472        match engine {
473            "postgres" => {
474                self.wait_for_postgres(username, password, db_name, host_port)
475                    .await?
476            }
477            "mysql" | "mariadb" => {
478                self.wait_for_mysql(username, password, db_name, host_port)
479                    .await?
480            }
481            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
482                self.wait_for_oracle(&running.container_id, host_port)
483                    .await?
484            }
485            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
486                self.wait_for_sqlserver(&running.container_id, host_port)
487                    .await?
488            }
489            "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
490            _ => {
491                self.wait_for_postgres(username, password, db_name, host_port)
492                    .await?
493            }
494        };
495        let running = RunningDbContainer {
496            container_id: running.container_id,
497            host_port,
498            endpoint_address: self.net.sibling_host.clone(),
499            endpoint_port: host_port,
500        };
501        self.containers
502            .write()
503            .insert(db_instance_identifier.to_string(), running.clone());
504        Ok(running)
505    }
506
507    pub async fn stop_all(&self) {
508        let containers: Vec<String> = {
509            let mut containers = self.containers.write();
510            containers
511                .drain()
512                .map(|(_, container)| container.container_id)
513                .collect()
514        };
515        for container_id in containers {
516            self.remove_container(&container_id).await;
517        }
518    }
519
520    async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
521        let port_output = tokio::process::Command::new(&self.cli)
522            .args(["port", container_id, port])
523            .output()
524            .await
525            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
526
527        let port_str = String::from_utf8_lossy(&port_output.stdout);
528        port_str
529            .trim()
530            .rsplit(':')
531            .next()
532            .and_then(|value| value.parse::<u16>().ok())
533            .ok_or_else(|| {
534                RuntimeError::ContainerStartFailed(format!(
535                    "could not determine container port from '{}'",
536                    port_str.trim()
537                ))
538            })
539    }
540
541    async fn wait_for_postgres(
542        &self,
543        username: &str,
544        password: &str,
545        db_name: &str,
546        host_port: u16,
547    ) -> Result<(), RuntimeError> {
548        for _ in 0..40 {
549            tokio::time::sleep(Duration::from_millis(500)).await;
550            let connection_string = format!(
551                "host={} port={host_port} user={username} password={password} dbname={db_name}",
552                self.net.sibling_host
553            );
554            if let Ok((client, connection)) =
555                tokio_postgres::connect(&connection_string, NoTls).await
556            {
557                tokio::spawn(async move {
558                    let _ = connection.await;
559                });
560                if client.simple_query("SELECT 1").await.is_ok() {
561                    return Ok(());
562                }
563            }
564        }
565
566        Err(RuntimeError::ContainerStartFailed(
567            "postgres container did not become ready within 20 seconds".to_string(),
568        ))
569    }
570
571    async fn wait_for_mysql(
572        &self,
573        username: &str,
574        password: &str,
575        db_name: &str,
576        host_port: u16,
577    ) -> Result<(), RuntimeError> {
578        use mysql_async::prelude::*;
579        use mysql_async::OptsBuilder;
580
581        for attempt in 1..=40 {
582            let opts = OptsBuilder::default()
583                .ip_or_hostname(self.net.sibling_host.as_str())
584                .tcp_port(host_port)
585                .user(Some(username))
586                .pass(Some(password))
587                .db_name(Some(db_name));
588
589            match mysql_async::Conn::new(opts).await {
590                Ok(mut conn) => {
591                    if conn.query_drop("SELECT 1").await.is_ok() {
592                        let _ = conn.disconnect().await;
593                        return Ok(());
594                    }
595                }
596                Err(_) => {
597                    if attempt < 40 {
598                        tokio::time::sleep(Duration::from_millis(500)).await;
599                    }
600                    continue;
601                }
602            }
603        }
604
605        Err(RuntimeError::ContainerStartFailed(
606            "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
607        ))
608    }
609
610    /// Wait for Oracle Database Free to finish bootstrapping. The
611    /// `gvenzl/oracle-free` image prints `DATABASE IS READY TO USE!`
612    /// to stdout once the listener accepts connections, so we poll
613    /// `docker logs` until that marker appears (or the deadline elapses).
614    /// Oracle XE/Free typically takes 30-90 seconds on first start.
615    async fn wait_for_oracle(
616        &self,
617        container_id: &str,
618        host_port: u16,
619    ) -> Result<(), RuntimeError> {
620        self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
621            .await?;
622        self.wait_for_tcp(host_port, 30).await
623    }
624
625    /// Wait for SQL Server to be ready. The official mssql/server image
626    /// emits `SQL Server is now ready for client connections.` once it
627    /// accepts TCP connections on 1433.
628    async fn wait_for_sqlserver(
629        &self,
630        container_id: &str,
631        host_port: u16,
632    ) -> Result<(), RuntimeError> {
633        self.wait_for_log_marker(
634            container_id,
635            "SQL Server is now ready for client connections",
636            180,
637        )
638        .await?;
639        self.wait_for_tcp(host_port, 30).await
640    }
641
642    /// Wait for Db2 Community Edition to finish setup. The
643    /// `icr.io/db2_community/db2` image prints `Setup has completed.`
644    /// once the instance is up and the database has been created.
645    async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
646        self.wait_for_log_marker(container_id, "Setup has completed", 360)
647            .await?;
648        self.wait_for_tcp(host_port, 60).await
649    }
650
651    /// Poll `docker logs <container>` until the supplied marker appears
652    /// in stdout or stderr. `deadline_secs` caps total wait.
653    async fn wait_for_log_marker(
654        &self,
655        container_id: &str,
656        marker: &str,
657        deadline_secs: u64,
658    ) -> Result<(), RuntimeError> {
659        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
660        while std::time::Instant::now() < deadline {
661            let output = tokio::process::Command::new(&self.cli)
662                .args(["logs", container_id])
663                .output()
664                .await
665                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
666            let stdout = String::from_utf8_lossy(&output.stdout);
667            let stderr = String::from_utf8_lossy(&output.stderr);
668            if stdout.contains(marker) || stderr.contains(marker) {
669                return Ok(());
670            }
671            tokio::time::sleep(Duration::from_secs(2)).await;
672        }
673        Err(RuntimeError::ContainerStartFailed(format!(
674            "container did not log '{}' within {} seconds",
675            marker, deadline_secs
676        )))
677    }
678
679    /// TCP-probe the host port until it accepts a connection or the
680    /// deadline elapses. Use after `wait_for_log_marker` since the
681    /// listener may bind a moment after the readiness log line.
682    async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
683        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
684        let host = self.net.sibling_host.as_str();
685        while std::time::Instant::now() < deadline {
686            if tokio::net::TcpStream::connect((host, host_port))
687                .await
688                .is_ok()
689            {
690                return Ok(());
691            }
692            tokio::time::sleep(Duration::from_millis(500)).await;
693        }
694        Err(RuntimeError::ContainerStartFailed(format!(
695            "TCP probe to {}:{} did not succeed within {}s",
696            host, host_port, deadline_secs
697        )))
698    }
699
700    async fn remove_container(&self, container_id: &str) {
701        let _ = tokio::process::Command::new(&self.cli)
702            .args(["rm", "-f", container_id])
703            .output()
704            .await;
705    }
706
707    /// Build (or reuse) the fakecloud-postgres image for a given major
708    /// version. The image bakes plpython3u plus the aws_commons and
709    /// aws_lambda extension files so users can run
710    /// `CREATE EXTENSION aws_lambda CASCADE` inside any database.
711    /// Tag includes a content hash so changes to the embedded assets
712    /// invalidate the local cache automatically.
713    /// Resolve the postgres image tag for a given major version. Tries
714    /// (in order): in-process cache, `docker image inspect` for a copy
715    /// already on the daemon, `docker pull` of the prebuilt image
716    /// published by CI, and finally a local `docker build` from the
717    /// embedded Dockerfile + extension assets. The pull path is the
718    /// happy path for end users on tagged releases; the build path
719    /// covers dev / unreleased versions / airgapped setups.
720    ///
721    /// Honors:
722    /// - `FAKECLOUD_POSTGRES_REGISTRY` — registry prefix (default
723    ///   `ghcr.io/faiscadev`); useful for private mirrors.
724    /// - `FAKECLOUD_REBUILD_POSTGRES_IMAGE` — when set to a non-empty
725    ///   value, skip inspect + pull and force a fresh local build.
726    ///   Use after editing the embedded Dockerfile or extension SQL.
727    pub(crate) async fn ensure_postgres_image(
728        &self,
729        major_version: &str,
730    ) -> Result<String, RuntimeError> {
731        let tag = bridge_image_tag("fakecloud-postgres", major_version);
732        self.ensure_bridge_image(&tag, |tag| async move {
733            self.build_postgres_image_local(major_version, &tag).await
734        })
735        .await
736    }
737
738    async fn docker_image_exists(&self, tag: &str) -> bool {
739        tokio::process::Command::new(&self.cli)
740            .args(["image", "inspect", tag])
741            .stdout(std::process::Stdio::null())
742            .stderr(std::process::Stdio::null())
743            .status()
744            .await
745            .map(|status| status.success())
746            .unwrap_or(false)
747    }
748
749    async fn try_pull_image(&self, tag: &str) -> bool {
750        tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
751        let output = match tokio::process::Command::new(&self.cli)
752            .args(["pull", tag])
753            .output()
754            .await
755        {
756            Ok(output) => output,
757            Err(e) => {
758                tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
759                return false;
760            }
761        };
762        if output.status.success() {
763            return true;
764        }
765        tracing::info!(
766            tag = %tag,
767            stderr = %String::from_utf8_lossy(&output.stderr).trim(),
768            "Prebuilt postgres image not available, falling back to local build"
769        );
770        false
771    }
772
773    async fn build_postgres_image_local(
774        &self,
775        major_version: &str,
776        tag: &str,
777    ) -> Result<(), RuntimeError> {
778        let assets: [(&str, &str); 8] = [
779            ("Dockerfile", POSTGRES_DOCKERFILE),
780            ("aws_commons.control", AWS_COMMONS_CONTROL),
781            ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
782            ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
783            ("aws_lambda.control", AWS_LAMBDA_CONTROL),
784            ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
785            ("aws_s3.control", AWS_S3_CONTROL),
786            ("aws_s3--1.0.sql", AWS_S3_SQL),
787        ];
788        self.build_image_local(
789            tag,
790            &assets,
791            &format!("PG_VERSION={major_version}"),
792            "fakecloud-postgres",
793        )
794        .await
795    }
796
797    /// Pull-first / build-fallback for the prebuilt fakecloud-mysql
798    /// image. Mirrors `ensure_postgres_image`. The image bakes a small
799    /// libcurl-backed UDF + Aurora-compatible `mysql.lambda_async` /
800    /// `mysql.lambda_sync` stored procedures.
801    pub(crate) async fn ensure_mysql_image(
802        &self,
803        major_version: &str,
804    ) -> Result<String, RuntimeError> {
805        let tag = bridge_image_tag("fakecloud-mysql", major_version);
806        self.ensure_bridge_image(&tag, |tag| async move {
807            self.build_mysql_image_local(major_version, &tag).await
808        })
809        .await
810    }
811
812    pub(crate) async fn ensure_mariadb_image(
813        &self,
814        major_version: &str,
815    ) -> Result<String, RuntimeError> {
816        let tag = bridge_image_tag("fakecloud-mariadb", major_version);
817        self.ensure_bridge_image(&tag, |tag| async move {
818            self.build_mariadb_image_local(major_version, &tag).await
819        })
820        .await
821    }
822
823    /// Shared pull-first/build-fallback orchestration used by every
824    /// bridge-aware engine. Holds the per-tag mutex, checks the local
825    /// daemon first, then tries the prebuilt image, and finally
826    /// invokes the supplied local-build closure.
827    async fn ensure_bridge_image<F, Fut>(
828        &self,
829        tag: &str,
830        build_local: F,
831    ) -> Result<String, RuntimeError>
832    where
833        F: FnOnce(String) -> Fut,
834        Fut: std::future::Future<Output = Result<(), RuntimeError>>,
835    {
836        let lock = {
837            let mut cache = self.image_cache.write();
838            cache
839                .entry(tag.to_string())
840                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
841                .clone()
842        };
843        let mut resolved = lock.lock().await;
844        if *resolved {
845            return Ok(tag.to_string());
846        }
847
848        let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
849            .map(|v| !v.is_empty())
850            .unwrap_or(false);
851
852        if !force_rebuild {
853            if self.docker_image_exists(tag).await {
854                *resolved = true;
855                return Ok(tag.to_string());
856            }
857            if self.try_pull_image(tag).await {
858                *resolved = true;
859                return Ok(tag.to_string());
860            }
861        }
862
863        build_local(tag.to_string()).await?;
864        *resolved = true;
865        Ok(tag.to_string())
866    }
867
868    async fn build_mysql_image_local(
869        &self,
870        major_version: &str,
871        tag: &str,
872    ) -> Result<(), RuntimeError> {
873        let assets: [(&str, &str); 4] = [
874            ("Dockerfile", MYSQL_DOCKERFILE),
875            ("fakecloud_udf.c", MYSQL_UDF_C),
876            ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
877            ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
878        ];
879        self.build_image_local(
880            tag,
881            &assets,
882            &format!("MYSQL_VERSION={major_version}"),
883            "fakecloud-mysql",
884        )
885        .await
886    }
887
888    async fn build_mariadb_image_local(
889        &self,
890        major_version: &str,
891        tag: &str,
892    ) -> Result<(), RuntimeError> {
893        let assets: [(&str, &str); 4] = [
894            ("Dockerfile", MARIADB_DOCKERFILE),
895            ("fakecloud_udf.c", MARIADB_UDF_C),
896            ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
897            ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
898        ];
899        self.build_image_local(
900            tag,
901            &assets,
902            &format!("MARIADB_VERSION={major_version}"),
903            "fakecloud-mariadb",
904        )
905        .await
906    }
907
908    async fn build_image_local(
909        &self,
910        tag: &str,
911        assets: &[(&str, &str)],
912        build_arg: &str,
913        image_label: &str,
914    ) -> Result<(), RuntimeError> {
915        let build_dir =
916            tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
917        for (name, contents) in assets {
918            tokio::fs::write(build_dir.path().join(name), contents)
919                .await
920                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
921        }
922
923        tracing::info!(
924            tag = %tag,
925            image = %image_label,
926            "Building {image_label} image locally (first use can take ~60s)"
927        );
928
929        let output = tokio::process::Command::new(&self.cli)
930            .args(["build", "--build-arg", build_arg, "-t", tag, "."])
931            .current_dir(build_dir.path())
932            .output()
933            .await
934            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
935
936        if !output.status.success() {
937            return Err(RuntimeError::ContainerStartFailed(format!(
938                "docker build for {} failed: {}",
939                tag,
940                String::from_utf8_lossy(&output.stderr).trim()
941            )));
942        }
943
944        Ok(())
945    }
946
947    pub async fn dump_database(
948        &self,
949        db_instance_identifier: &str,
950        engine: &str,
951        username: &str,
952        password: &str,
953        db_name: &str,
954    ) -> Result<Vec<u8>, RuntimeError> {
955        let container = self
956            .containers
957            .read()
958            .get(db_instance_identifier)
959            .cloned()
960            .ok_or(RuntimeError::Unavailable)?;
961
962        if let Some(k) = &self.k8s {
963            return k
964                .dump_database(&container.container_id, engine, username, password, db_name)
965                .await;
966        }
967
968        let args: Vec<String> = match engine {
969            "mysql" | "mariadb" => vec![
970                "exec".into(),
971                container.container_id.clone(),
972                "mysqldump".into(),
973                "-u".into(),
974                username.into(),
975                format!("-p{password}"),
976                db_name.into(),
977            ],
978            "postgres" => vec![
979                "exec".into(),
980                container.container_id.clone(),
981                "pg_dump".into(),
982                "-U".into(),
983                username.into(),
984                "-d".into(),
985                db_name.into(),
986                "--no-password".into(),
987            ],
988            // Heavy engines don't ship with a portable dump CLI we can
989            // shell out to from the host the same way pg_dump and
990            // mysqldump are guaranteed available. Surface a clear
991            // error so callers (snapshot/read-replica) don't silently
992            // run the wrong tool against an Oracle/SQL Server/Db2
993            // container.
994            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
995            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
996                return Err(RuntimeError::ContainerStartFailed(format!(
997                    "engine {engine} is not yet supported by the snapshot/read-replica path; \
998                     emulator stores the API state but cannot dump the underlying database"
999                )));
1000            }
1001            other => {
1002                return Err(RuntimeError::ContainerStartFailed(format!(
1003                    "engine {other} is not supported by dump_database"
1004                )));
1005            }
1006        };
1007
1008        let output = tokio::process::Command::new(&self.cli)
1009            .args(&args)
1010            .output()
1011            .await
1012            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1013
1014        if !output.status.success() {
1015            return Err(RuntimeError::ContainerStartFailed(format!(
1016                "dump failed: {}",
1017                String::from_utf8_lossy(&output.stderr).trim()
1018            )));
1019        }
1020
1021        Ok(output.stdout)
1022    }
1023
1024    /// Cat a file inside the running container by absolute path.
1025    /// Returns `Unavailable` when the runtime/daemon is missing or the
1026    /// instance has no live container; returns `ContainerStartFailed`
1027    /// when the file is missing or unreadable. Callers map those to the
1028    /// AWS-shaped responses.
1029    pub async fn read_log_file(
1030        &self,
1031        db_instance_identifier: &str,
1032        container_path: &str,
1033    ) -> Result<Vec<u8>, RuntimeError> {
1034        let container = self
1035            .containers
1036            .read()
1037            .get(db_instance_identifier)
1038            .cloned()
1039            .ok_or(RuntimeError::Unavailable)?;
1040
1041        if let Some(k) = &self.k8s {
1042            return k.read_file(&container.container_id, container_path).await;
1043        }
1044
1045        let output = tokio::process::Command::new(&self.cli)
1046            .args(["exec", &container.container_id, "cat", container_path])
1047            .output()
1048            .await
1049            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1050
1051        if !output.status.success() {
1052            return Err(RuntimeError::ContainerStartFailed(format!(
1053                "cat {container_path} failed: {}",
1054                String::from_utf8_lossy(&output.stderr).trim()
1055            )));
1056        }
1057
1058        Ok(output.stdout)
1059    }
1060
1061    pub async fn restore_database(
1062        &self,
1063        db_instance_identifier: &str,
1064        engine: &str,
1065        username: &str,
1066        password: &str,
1067        db_name: &str,
1068        dump_data: &[u8],
1069    ) -> Result<(), RuntimeError> {
1070        let container = self
1071            .containers
1072            .read()
1073            .get(db_instance_identifier)
1074            .cloned()
1075            .ok_or(RuntimeError::Unavailable)?;
1076
1077        if let Some(k) = &self.k8s {
1078            return k
1079                .restore_database(
1080                    &container.container_id,
1081                    engine,
1082                    username,
1083                    password,
1084                    db_name,
1085                    dump_data,
1086                )
1087                .await;
1088        }
1089
1090        let args: Vec<String> = match engine {
1091            "mysql" | "mariadb" => vec![
1092                "exec".into(),
1093                "-i".into(),
1094                container.container_id.clone(),
1095                "mysql".into(),
1096                "-u".into(),
1097                username.into(),
1098                format!("-p{password}"),
1099                db_name.into(),
1100            ],
1101            "postgres" => vec![
1102                "exec".into(),
1103                "-i".into(),
1104                container.container_id.clone(),
1105                "psql".into(),
1106                "-U".into(),
1107                username.into(),
1108                "-d".into(),
1109                db_name.into(),
1110                "--no-password".into(),
1111                "-v".into(),
1112                "ON_ERROR_STOP=1".into(),
1113            ],
1114            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1115            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1116                return Err(RuntimeError::ContainerStartFailed(format!(
1117                    "engine {engine} is not yet supported by the snapshot-restore path"
1118                )));
1119            }
1120            other => {
1121                return Err(RuntimeError::ContainerStartFailed(format!(
1122                    "engine {other} is not supported by restore_database"
1123                )));
1124            }
1125        };
1126
1127        let mut child = tokio::process::Command::new(&self.cli)
1128            .args(&args)
1129            .stdin(std::process::Stdio::piped())
1130            .stdout(std::process::Stdio::piped())
1131            .stderr(std::process::Stdio::piped())
1132            .spawn()
1133            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1134
1135        if let Some(mut stdin) = child.stdin.take() {
1136            use tokio::io::AsyncWriteExt;
1137            stdin
1138                .write_all(dump_data)
1139                .await
1140                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1141            drop(stdin);
1142        }
1143
1144        let output = child
1145            .wait_with_output()
1146            .await
1147            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1148
1149        if !output.status.success() {
1150            return Err(RuntimeError::ContainerStartFailed(format!(
1151                "restore failed: {}",
1152                String::from_utf8_lossy(&output.stderr).trim()
1153            )));
1154        }
1155
1156        Ok(())
1157    }
1158}
1159
1160/// Build the prebuilt-image reference for a given engine + major
1161/// version. Uses `<registry>/<image>:<major>-<fakecloud-version>`,
1162/// where the registry comes from `FAKECLOUD_POSTGRES_REGISTRY` (kept
1163/// historical name; defaults to the public `ghcr.io/faiscadev`).
1164/// The version pin guarantees the runtime asks the daemon for the
1165/// same image CI publishes for this fakecloud release; mismatched
1166/// assets force a local rebuild via the fall-through in
1167/// `ensure_bridge_image`.
1168pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1169    let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1170        .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1171    let registry = registry.trim_end_matches('/');
1172    format!(
1173        "{}/{}:{}-{}",
1174        registry,
1175        image,
1176        major_version,
1177        env!("CARGO_PKG_VERSION")
1178    )
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184
1185    /// Single test (rather than three) so the cases run sequentially —
1186    /// `bridge_image_tag` reads a process-global env var and parallel
1187    /// `cargo test` workers would race over it otherwise.
1188    #[test]
1189    fn bridge_image_tag_resolves_registry_overrides() {
1190        let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
1191
1192        std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
1193        assert_eq!(
1194            bridge_image_tag("fakecloud-postgres", "16"),
1195            format!(
1196                "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1197                env!("CARGO_PKG_VERSION")
1198            )
1199        );
1200        assert_eq!(
1201            bridge_image_tag("fakecloud-mysql", "8.0"),
1202            format!(
1203                "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1204                env!("CARGO_PKG_VERSION")
1205            )
1206        );
1207        assert_eq!(
1208            bridge_image_tag("fakecloud-mariadb", "10.11"),
1209            format!(
1210                "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1211                env!("CARGO_PKG_VERSION")
1212            )
1213        );
1214
1215        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
1216        assert_eq!(
1217            bridge_image_tag("fakecloud-postgres", "15"),
1218            format!(
1219                "registry.example.com/team/fakecloud-postgres:15-{}",
1220                env!("CARGO_PKG_VERSION")
1221            )
1222        );
1223
1224        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
1225        assert_eq!(
1226            bridge_image_tag("fakecloud-postgres", "13"),
1227            format!(
1228                "registry.example.com/team/fakecloud-postgres:13-{}",
1229                env!("CARGO_PKG_VERSION")
1230            )
1231        );
1232
1233        match prev {
1234            Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
1235            None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
1236        }
1237    }
1238}