Skip to main content

fakecloud_rds/runtime/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use parking_lot::RwLock;
6use tokio_postgres::NoTls;
7
8mod k8s;
9
10const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
11const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
12const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
13const AWS_COMMONS_UPGRADE_SQL: &str =
14    include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
15const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
16const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
17const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
18const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
19
20const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
21const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
22const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
23const MYSQL_BOOTSTRAP_SQL: &str =
24    include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
25
26const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
27const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
28const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
29const MARIADB_BOOTSTRAP_SQL: &str =
30    include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
31
32/// Default registry that hosts the prebuilt postgres images. CI publishes
33/// to `ghcr.io/faiscadev/fakecloud-postgres:<major>-<version>` on each
34/// release tag (see `.github/workflows/docker-rds-images.yml`).
35/// Override with the `FAKECLOUD_POSTGRES_REGISTRY` env var (e.g. for
36/// private mirrors); set `FAKECLOUD_REBUILD_POSTGRES_IMAGE=1` to force
37/// a local rebuild even when the published tag is reachable.
38const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
39
40#[derive(Debug, Clone)]
41pub struct RunningDbContainer {
42    /// Backend-specific handle: a Docker container id, or a Pod name.
43    pub container_id: String,
44    /// Host-published port (Docker) or the engine's in-Pod port (k8s).
45    pub host_port: u16,
46    /// Address clients (and fakecloud's own readiness/restore) connect to:
47    /// `127.0.0.1` for Docker, the Pod IP for k8s.
48    pub endpoint_address: String,
49    /// Port clients connect to: the published host port for Docker, the
50    /// engine's standard port for k8s.
51    pub endpoint_port: u16,
52}
53
54pub struct RdsRuntime {
55    cli: String,
56    containers: RwLock<HashMap<String, RunningDbContainer>>,
57    instance_id: String,
58    host_ip: String,
59    server_port: u16,
60    image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
61    /// `Some` when running on the Kubernetes backend; the public methods
62    /// dispatch to it and the Docker fields above go unused. `None` is the
63    /// default Docker/Podman backend.
64    k8s: Option<k8s::K8sDb>,
65}
66
67#[derive(Debug, thiserror::Error)]
68pub enum RuntimeError {
69    #[error("container runtime is unavailable")]
70    Unavailable,
71    #[error("container failed to start: {0}")]
72    ContainerStartFailed(String),
73}
74
75/// Error initializing the Kubernetes backend at startup. Surfaced to the
76/// operator so a misconfigured cluster fails fast rather than silently
77/// falling back to Docker.
78#[derive(Debug, thiserror::Error)]
79pub enum BackendInitError {
80    #[error(transparent)]
81    Env(#[from] fakecloud_k8s::K8sEnvError),
82    #[error("failed to connect to the Kubernetes cluster: {0}")]
83    Connect(String),
84}
85
86impl RdsRuntime {
87    pub fn new(server_port: u16) -> Option<Self> {
88        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
89            if cli_available(&cli) {
90                cli
91            } else {
92                return None;
93            }
94        } else if cli_available("docker") {
95            "docker".to_string()
96        } else if cli_available("podman") {
97            "podman".to_string()
98        } else {
99            return None;
100        };
101
102        // Match Lambda runtime container-to-host networking: Linux uses the
103        // bridge gateway IP directly, macOS/Windows use Docker Desktop's
104        // host-gateway alias. Containers reach fakecloud at host.docker.internal.
105        let host_ip = if cfg!(target_os = "linux") {
106            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
107        } else {
108            "host-gateway".to_string()
109        };
110
111        Some(Self {
112            cli,
113            containers: RwLock::new(HashMap::new()),
114            instance_id: format!("fakecloud-{}", std::process::id()),
115            host_ip,
116            server_port,
117            image_cache: RwLock::new(HashMap::new()),
118            k8s: None,
119        })
120    }
121
122    /// Construct the Kubernetes backend. `server_port` is fakecloud's
123    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
124    /// fast on misconfiguration — never silently degrades to Docker.
125    pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
126        let db = k8s::K8sDb::from_env(server_port).await?;
127        Ok(Self {
128            cli: String::new(),
129            containers: RwLock::new(HashMap::new()),
130            instance_id: format!("fakecloud-{}", std::process::id()),
131            host_ip: String::new(),
132            server_port,
133            image_cache: RwLock::new(HashMap::new()),
134            k8s: Some(db),
135        })
136    }
137
138    pub fn cli_name(&self) -> &str {
139        if self.k8s.is_some() {
140            "kubernetes"
141        } else {
142            &self.cli
143        }
144    }
145
146    /// Sweep DB Pods orphaned by a previous process (k8s only; no-op on
147    /// the Docker backend, which the shared container reaper handles).
148    pub async fn reap_stale(&self) {
149        if let Some(k) = &self.k8s {
150            k.reap_stale().await;
151        }
152    }
153
154    #[allow(clippy::too_many_arguments)]
155    pub async fn ensure_postgres(
156        &self,
157        db_instance_identifier: &str,
158        engine: &str,
159        engine_version: &str,
160        username: &str,
161        password: &str,
162        db_name: &str,
163        account_id: &str,
164        region: &str,
165    ) -> Result<RunningDbContainer, RuntimeError> {
166        if let Some(k) = &self.k8s {
167            let running = k
168                .ensure(
169                    db_instance_identifier,
170                    engine,
171                    engine_version,
172                    username,
173                    password,
174                    db_name,
175                    account_id,
176                    region,
177                )
178                .await?;
179            self.containers
180                .write()
181                .insert(db_instance_identifier.to_string(), running.clone());
182            return Ok(running);
183        }
184        self.stop_container(db_instance_identifier).await;
185
186        // Determine Docker image and port based on engine. Postgres,
187        // MySQL, and MariaDB all use prebuilt fakecloud-* images that
188        // bake in the bridge UDFs / extensions and call back into the
189        // host fakecloud server; the heavier engines (oracle/mssql/db2)
190        // stay on upstream images. `bridge_engine_version` is `Some(_)`
191        // for the bridge-aware engines and gates the `--add-host`
192        // setup below.
193        let (image, port, env_vars, bridge_engine_version) = match engine {
194            "postgres" => {
195                let major_version = engine_version.split('.').next().unwrap_or("16");
196                let image = self.ensure_postgres_image(major_version).await?;
197                let env_vars = vec![
198                    format!("POSTGRES_USER={username}"),
199                    format!("POSTGRES_PASSWORD={password}"),
200                    format!("POSTGRES_DB={db_name}"),
201                    format!(
202                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
203                        self.server_port
204                    ),
205                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
206                    format!("FAKECLOUD_REGION={region}"),
207                ];
208                (image, "5432", env_vars, Some(major_version.to_string()))
209            }
210            "mysql" => {
211                // 5.7 was dropped after Oracle community support ended
212                // (Oct 2023) — the image base no longer ships the build
213                // deps we need for the UDF. Any 5.7.* engine version
214                // resolves to 8.0.
215                let _ = engine_version;
216                let major_version = "8.0";
217                let image = self.ensure_mysql_image(major_version).await?;
218                let env_vars = vec![
219                    format!("MYSQL_ROOT_PASSWORD={password}"),
220                    format!("MYSQL_USER={username}"),
221                    format!("MYSQL_PASSWORD={password}"),
222                    format!("MYSQL_DATABASE={db_name}"),
223                    format!(
224                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
225                        self.server_port
226                    ),
227                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
228                    format!("FAKECLOUD_REGION={region}"),
229                ];
230                (image, "3306", env_vars, Some(major_version.to_string()))
231            }
232            "mariadb" => {
233                let major_version = if engine_version.starts_with("10.11") {
234                    "10.11"
235                } else if engine_version.starts_with("11.4") {
236                    "11.4"
237                } else {
238                    "10.6"
239                };
240                let image = self.ensure_mariadb_image(major_version).await?;
241                let env_vars = vec![
242                    format!("MARIADB_ROOT_PASSWORD={password}"),
243                    format!("MARIADB_USER={username}"),
244                    format!("MARIADB_PASSWORD={password}"),
245                    format!("MARIADB_DATABASE={db_name}"),
246                    format!(
247                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
248                        self.server_port
249                    ),
250                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
251                    format!("FAKECLOUD_REGION={region}"),
252                ];
253                (image, "3306", env_vars, Some(major_version.to_string()))
254            }
255            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
256                // Oracle Database Free is the no-cost dev edition shipped by
257                // Oracle. The container exposes a "FREEPDB1" pluggable
258                // database and creates the SYSTEM user with the password
259                // from ORACLE_PASSWORD.
260                let image = "gvenzl/oracle-free:23-slim".to_string();
261                let env_vars = vec![
262                    format!("ORACLE_PASSWORD={password}"),
263                    format!("APP_USER={username}"),
264                    format!("APP_USER_PASSWORD={password}"),
265                    format!("ORACLE_DATABASE={db_name}"),
266                ];
267                (image, "1521", env_vars, None)
268            }
269            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
270                // SQL Server Express is free for dev/test with no license
271                // ceiling. SA password must satisfy MSSQL's complexity
272                // requirements (>=8 chars, mix of classes); callers should
273                // supply one that does or the container will refuse to
274                // start.
275                let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
276                let env_vars = vec![
277                    "ACCEPT_EULA=Y".to_string(),
278                    format!("MSSQL_SA_PASSWORD={password}"),
279                    "MSSQL_PID=Express".to_string(),
280                ];
281                (image, "1433", env_vars, None)
282            }
283            "db2-se" | "db2-ae" => {
284                // Db2 Community Edition is free under the standard IBM
285                // Community License. The container exposes a single
286                // database named after DBNAME, owned by db2inst1.
287                let image = "icr.io/db2_community/db2:latest".to_string();
288                let env_vars = vec![
289                    "LICENSE=accept".to_string(),
290                    "DB2INSTANCE=db2inst1".to_string(),
291                    format!("DB2INST1_PASSWORD={password}"),
292                    format!("DBNAME={db_name}"),
293                ];
294                (image, "50000", env_vars, None)
295            }
296            _ => {
297                return Err(RuntimeError::ContainerStartFailed(format!(
298                    "Unsupported engine: {}",
299                    engine
300                )))
301            }
302        };
303
304        // Db2 needs --privileged to set kernel parameters during startup.
305        let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
306
307        // Build container create args
308        let mut args = vec![
309            "create".to_string(),
310            "-p".to_string(),
311            format!(":{}", port),
312            "--label".to_string(),
313            format!("fakecloud-rds={db_instance_identifier}"),
314            "--label".to_string(),
315            format!("fakecloud-instance={}", self.instance_id),
316        ];
317
318        if needs_privileged {
319            args.push("--privileged".to_string());
320        }
321
322        // Bridge-aware engines (postgres aws_lambda, mysql/mariadb
323        // fakecloud_post UDF) call back into fakecloud over HTTP. Wire
324        // the host gateway alias so the in-container code can resolve
325        // host.docker.internal on every platform.
326        if bridge_engine_version.is_some() {
327            args.push("--add-host".to_string());
328            args.push(format!("host.docker.internal:{}", self.host_ip));
329        }
330
331        for env_var in env_vars {
332            args.push("-e".to_string());
333            args.push(env_var);
334        }
335
336        args.push(image);
337
338        let output = tokio::process::Command::new(&self.cli)
339            .args(&args)
340            .output()
341            .await
342            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
343
344        if !output.status.success() {
345            return Err(RuntimeError::ContainerStartFailed(
346                String::from_utf8_lossy(&output.stderr).trim().to_string(),
347            ));
348        }
349
350        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
351        let start_result = tokio::process::Command::new(&self.cli)
352            .args(["start", &container_id])
353            .output()
354            .await
355            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
356
357        if !start_result.status.success() {
358            self.remove_container(&container_id).await;
359            return Err(RuntimeError::ContainerStartFailed(format!(
360                "container start failed: {}",
361                String::from_utf8_lossy(&start_result.stderr).trim()
362            )));
363        }
364
365        let host_port = match self.lookup_port(&container_id, port).await {
366            Ok(host_port) => host_port,
367            Err(error) => {
368                self.remove_container(&container_id).await;
369                return Err(error);
370            }
371        };
372
373        // Wait for database to be ready
374        let wait_result = match engine {
375            "postgres" => {
376                self.wait_for_postgres(username, password, db_name, host_port)
377                    .await
378            }
379            "mysql" | "mariadb" => {
380                self.wait_for_mysql(username, password, db_name, host_port)
381                    .await
382            }
383            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
384                self.wait_for_oracle(&container_id, host_port).await
385            }
386            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
387                self.wait_for_sqlserver(&container_id, host_port).await
388            }
389            "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
390            _ => unreachable!("engine already validated"),
391        };
392
393        if let Err(error) = wait_result {
394            self.remove_container(&container_id).await;
395            return Err(error);
396        }
397
398        let running = RunningDbContainer {
399            container_id,
400            host_port,
401            endpoint_address: "127.0.0.1".to_string(),
402            endpoint_port: host_port,
403        };
404        self.containers
405            .write()
406            .insert(db_instance_identifier.to_string(), running.clone());
407        Ok(running)
408    }
409
410    pub async fn stop_container(&self, db_instance_identifier: &str) {
411        let container = self.containers.write().remove(db_instance_identifier);
412        if let Some(container) = container {
413            if let Some(k) = &self.k8s {
414                k.delete_pod(&container.container_id).await;
415            } else {
416                self.remove_container(&container.container_id).await;
417            }
418        }
419    }
420
421    pub async fn restart_container(
422        &self,
423        db_instance_identifier: &str,
424        engine: &str,
425        username: &str,
426        password: &str,
427        db_name: &str,
428    ) -> Result<RunningDbContainer, RuntimeError> {
429        if let Some(k) = &self.k8s {
430            let running = k
431                .restart(db_instance_identifier, engine, username, password, db_name)
432                .await?;
433            self.containers
434                .write()
435                .insert(db_instance_identifier.to_string(), running.clone());
436            return Ok(running);
437        }
438        let running = self
439            .containers
440            .read()
441            .get(db_instance_identifier)
442            .cloned()
443            .ok_or(RuntimeError::Unavailable)?;
444
445        let output = tokio::process::Command::new(&self.cli)
446            .args(["restart", &running.container_id])
447            .output()
448            .await
449            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
450
451        if !output.status.success() {
452            return Err(RuntimeError::ContainerStartFailed(format!(
453                "container restart failed: {}",
454                String::from_utf8_lossy(&output.stderr).trim()
455            )));
456        }
457
458        let port = match engine {
459            "postgres" => "5432",
460            "mysql" | "mariadb" => "3306",
461            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
462            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
463            "db2-se" | "db2-ae" => "50000",
464            _ => "5432", // fallback
465        };
466
467        let host_port = self.lookup_port(&running.container_id, port).await?;
468
469        match engine {
470            "postgres" => {
471                self.wait_for_postgres(username, password, db_name, host_port)
472                    .await?
473            }
474            "mysql" | "mariadb" => {
475                self.wait_for_mysql(username, password, db_name, host_port)
476                    .await?
477            }
478            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
479                self.wait_for_oracle(&running.container_id, host_port)
480                    .await?
481            }
482            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
483                self.wait_for_sqlserver(&running.container_id, host_port)
484                    .await?
485            }
486            "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
487            _ => {
488                self.wait_for_postgres(username, password, db_name, host_port)
489                    .await?
490            }
491        };
492        let running = RunningDbContainer {
493            container_id: running.container_id,
494            host_port,
495            endpoint_address: "127.0.0.1".to_string(),
496            endpoint_port: host_port,
497        };
498        self.containers
499            .write()
500            .insert(db_instance_identifier.to_string(), running.clone());
501        Ok(running)
502    }
503
504    pub async fn stop_all(&self) {
505        let containers: Vec<String> = {
506            let mut containers = self.containers.write();
507            containers
508                .drain()
509                .map(|(_, container)| container.container_id)
510                .collect()
511        };
512        for container_id in containers {
513            self.remove_container(&container_id).await;
514        }
515    }
516
517    async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
518        let port_output = tokio::process::Command::new(&self.cli)
519            .args(["port", container_id, port])
520            .output()
521            .await
522            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
523
524        let port_str = String::from_utf8_lossy(&port_output.stdout);
525        port_str
526            .trim()
527            .rsplit(':')
528            .next()
529            .and_then(|value| value.parse::<u16>().ok())
530            .ok_or_else(|| {
531                RuntimeError::ContainerStartFailed(format!(
532                    "could not determine container port from '{}'",
533                    port_str.trim()
534                ))
535            })
536    }
537
538    async fn wait_for_postgres(
539        &self,
540        username: &str,
541        password: &str,
542        db_name: &str,
543        host_port: u16,
544    ) -> Result<(), RuntimeError> {
545        for _ in 0..40 {
546            tokio::time::sleep(Duration::from_millis(500)).await;
547            let connection_string = format!(
548                "host=127.0.0.1 port={host_port} user={username} password={password} dbname={db_name}"
549            );
550            if let Ok((client, connection)) =
551                tokio_postgres::connect(&connection_string, NoTls).await
552            {
553                tokio::spawn(async move {
554                    let _ = connection.await;
555                });
556                if client.simple_query("SELECT 1").await.is_ok() {
557                    return Ok(());
558                }
559            }
560        }
561
562        Err(RuntimeError::ContainerStartFailed(
563            "postgres container did not become ready within 20 seconds".to_string(),
564        ))
565    }
566
567    async fn wait_for_mysql(
568        &self,
569        username: &str,
570        password: &str,
571        db_name: &str,
572        host_port: u16,
573    ) -> Result<(), RuntimeError> {
574        use mysql_async::prelude::*;
575        use mysql_async::OptsBuilder;
576
577        for attempt in 1..=40 {
578            let opts = OptsBuilder::default()
579                .ip_or_hostname("127.0.0.1")
580                .tcp_port(host_port)
581                .user(Some(username))
582                .pass(Some(password))
583                .db_name(Some(db_name));
584
585            match mysql_async::Conn::new(opts).await {
586                Ok(mut conn) => {
587                    if conn.query_drop("SELECT 1").await.is_ok() {
588                        let _ = conn.disconnect().await;
589                        return Ok(());
590                    }
591                }
592                Err(_) => {
593                    if attempt < 40 {
594                        tokio::time::sleep(Duration::from_millis(500)).await;
595                    }
596                    continue;
597                }
598            }
599        }
600
601        Err(RuntimeError::ContainerStartFailed(
602            "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
603        ))
604    }
605
606    /// Wait for Oracle Database Free to finish bootstrapping. The
607    /// `gvenzl/oracle-free` image prints `DATABASE IS READY TO USE!`
608    /// to stdout once the listener accepts connections, so we poll
609    /// `docker logs` until that marker appears (or the deadline elapses).
610    /// Oracle XE/Free typically takes 30-90 seconds on first start.
611    async fn wait_for_oracle(
612        &self,
613        container_id: &str,
614        host_port: u16,
615    ) -> Result<(), RuntimeError> {
616        self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
617            .await?;
618        self.wait_for_tcp(host_port, 30).await
619    }
620
621    /// Wait for SQL Server to be ready. The official mssql/server image
622    /// emits `SQL Server is now ready for client connections.` once it
623    /// accepts TCP connections on 1433.
624    async fn wait_for_sqlserver(
625        &self,
626        container_id: &str,
627        host_port: u16,
628    ) -> Result<(), RuntimeError> {
629        self.wait_for_log_marker(
630            container_id,
631            "SQL Server is now ready for client connections",
632            180,
633        )
634        .await?;
635        self.wait_for_tcp(host_port, 30).await
636    }
637
638    /// Wait for Db2 Community Edition to finish setup. The
639    /// `icr.io/db2_community/db2` image prints `Setup has completed.`
640    /// once the instance is up and the database has been created.
641    async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
642        self.wait_for_log_marker(container_id, "Setup has completed", 360)
643            .await?;
644        self.wait_for_tcp(host_port, 60).await
645    }
646
647    /// Poll `docker logs <container>` until the supplied marker appears
648    /// in stdout or stderr. `deadline_secs` caps total wait.
649    async fn wait_for_log_marker(
650        &self,
651        container_id: &str,
652        marker: &str,
653        deadline_secs: u64,
654    ) -> Result<(), RuntimeError> {
655        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
656        while std::time::Instant::now() < deadline {
657            let output = tokio::process::Command::new(&self.cli)
658                .args(["logs", container_id])
659                .output()
660                .await
661                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
662            let stdout = String::from_utf8_lossy(&output.stdout);
663            let stderr = String::from_utf8_lossy(&output.stderr);
664            if stdout.contains(marker) || stderr.contains(marker) {
665                return Ok(());
666            }
667            tokio::time::sleep(Duration::from_secs(2)).await;
668        }
669        Err(RuntimeError::ContainerStartFailed(format!(
670            "container did not log '{}' within {} seconds",
671            marker, deadline_secs
672        )))
673    }
674
675    /// TCP-probe the host port until it accepts a connection or the
676    /// deadline elapses. Use after `wait_for_log_marker` since the
677    /// listener may bind a moment after the readiness log line.
678    async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
679        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
680        while std::time::Instant::now() < deadline {
681            if tokio::net::TcpStream::connect(("127.0.0.1", host_port))
682                .await
683                .is_ok()
684            {
685                return Ok(());
686            }
687            tokio::time::sleep(Duration::from_millis(500)).await;
688        }
689        Err(RuntimeError::ContainerStartFailed(format!(
690            "TCP probe to 127.0.0.1:{} did not succeed within {}s",
691            host_port, deadline_secs
692        )))
693    }
694
695    async fn remove_container(&self, container_id: &str) {
696        let _ = tokio::process::Command::new(&self.cli)
697            .args(["rm", "-f", container_id])
698            .output()
699            .await;
700    }
701
702    /// Build (or reuse) the fakecloud-postgres image for a given major
703    /// version. The image bakes plpython3u plus the aws_commons and
704    /// aws_lambda extension files so users can run
705    /// `CREATE EXTENSION aws_lambda CASCADE` inside any database.
706    /// Tag includes a content hash so changes to the embedded assets
707    /// invalidate the local cache automatically.
708    /// Resolve the postgres image tag for a given major version. Tries
709    /// (in order): in-process cache, `docker image inspect` for a copy
710    /// already on the daemon, `docker pull` of the prebuilt image
711    /// published by CI, and finally a local `docker build` from the
712    /// embedded Dockerfile + extension assets. The pull path is the
713    /// happy path for end users on tagged releases; the build path
714    /// covers dev / unreleased versions / airgapped setups.
715    ///
716    /// Honors:
717    /// - `FAKECLOUD_POSTGRES_REGISTRY` — registry prefix (default
718    ///   `ghcr.io/faiscadev`); useful for private mirrors.
719    /// - `FAKECLOUD_REBUILD_POSTGRES_IMAGE` — when set to a non-empty
720    ///   value, skip inspect + pull and force a fresh local build.
721    ///   Use after editing the embedded Dockerfile or extension SQL.
722    pub(crate) async fn ensure_postgres_image(
723        &self,
724        major_version: &str,
725    ) -> Result<String, RuntimeError> {
726        let tag = bridge_image_tag("fakecloud-postgres", major_version);
727        self.ensure_bridge_image(&tag, |tag| async move {
728            self.build_postgres_image_local(major_version, &tag).await
729        })
730        .await
731    }
732
733    async fn docker_image_exists(&self, tag: &str) -> bool {
734        tokio::process::Command::new(&self.cli)
735            .args(["image", "inspect", tag])
736            .stdout(std::process::Stdio::null())
737            .stderr(std::process::Stdio::null())
738            .status()
739            .await
740            .map(|status| status.success())
741            .unwrap_or(false)
742    }
743
744    async fn try_pull_image(&self, tag: &str) -> bool {
745        tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
746        let output = match tokio::process::Command::new(&self.cli)
747            .args(["pull", tag])
748            .output()
749            .await
750        {
751            Ok(output) => output,
752            Err(e) => {
753                tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
754                return false;
755            }
756        };
757        if output.status.success() {
758            return true;
759        }
760        tracing::info!(
761            tag = %tag,
762            stderr = %String::from_utf8_lossy(&output.stderr).trim(),
763            "Prebuilt postgres image not available, falling back to local build"
764        );
765        false
766    }
767
768    async fn build_postgres_image_local(
769        &self,
770        major_version: &str,
771        tag: &str,
772    ) -> Result<(), RuntimeError> {
773        let assets: [(&str, &str); 8] = [
774            ("Dockerfile", POSTGRES_DOCKERFILE),
775            ("aws_commons.control", AWS_COMMONS_CONTROL),
776            ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
777            ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
778            ("aws_lambda.control", AWS_LAMBDA_CONTROL),
779            ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
780            ("aws_s3.control", AWS_S3_CONTROL),
781            ("aws_s3--1.0.sql", AWS_S3_SQL),
782        ];
783        self.build_image_local(
784            tag,
785            &assets,
786            &format!("PG_VERSION={major_version}"),
787            "fakecloud-postgres",
788        )
789        .await
790    }
791
792    /// Pull-first / build-fallback for the prebuilt fakecloud-mysql
793    /// image. Mirrors `ensure_postgres_image`. The image bakes a small
794    /// libcurl-backed UDF + Aurora-compatible `mysql.lambda_async` /
795    /// `mysql.lambda_sync` stored procedures.
796    pub(crate) async fn ensure_mysql_image(
797        &self,
798        major_version: &str,
799    ) -> Result<String, RuntimeError> {
800        let tag = bridge_image_tag("fakecloud-mysql", major_version);
801        self.ensure_bridge_image(&tag, |tag| async move {
802            self.build_mysql_image_local(major_version, &tag).await
803        })
804        .await
805    }
806
807    pub(crate) async fn ensure_mariadb_image(
808        &self,
809        major_version: &str,
810    ) -> Result<String, RuntimeError> {
811        let tag = bridge_image_tag("fakecloud-mariadb", major_version);
812        self.ensure_bridge_image(&tag, |tag| async move {
813            self.build_mariadb_image_local(major_version, &tag).await
814        })
815        .await
816    }
817
818    /// Shared pull-first/build-fallback orchestration used by every
819    /// bridge-aware engine. Holds the per-tag mutex, checks the local
820    /// daemon first, then tries the prebuilt image, and finally
821    /// invokes the supplied local-build closure.
822    async fn ensure_bridge_image<F, Fut>(
823        &self,
824        tag: &str,
825        build_local: F,
826    ) -> Result<String, RuntimeError>
827    where
828        F: FnOnce(String) -> Fut,
829        Fut: std::future::Future<Output = Result<(), RuntimeError>>,
830    {
831        let lock = {
832            let mut cache = self.image_cache.write();
833            cache
834                .entry(tag.to_string())
835                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
836                .clone()
837        };
838        let mut resolved = lock.lock().await;
839        if *resolved {
840            return Ok(tag.to_string());
841        }
842
843        let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
844            .map(|v| !v.is_empty())
845            .unwrap_or(false);
846
847        if !force_rebuild {
848            if self.docker_image_exists(tag).await {
849                *resolved = true;
850                return Ok(tag.to_string());
851            }
852            if self.try_pull_image(tag).await {
853                *resolved = true;
854                return Ok(tag.to_string());
855            }
856        }
857
858        build_local(tag.to_string()).await?;
859        *resolved = true;
860        Ok(tag.to_string())
861    }
862
863    async fn build_mysql_image_local(
864        &self,
865        major_version: &str,
866        tag: &str,
867    ) -> Result<(), RuntimeError> {
868        let assets: [(&str, &str); 4] = [
869            ("Dockerfile", MYSQL_DOCKERFILE),
870            ("fakecloud_udf.c", MYSQL_UDF_C),
871            ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
872            ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
873        ];
874        self.build_image_local(
875            tag,
876            &assets,
877            &format!("MYSQL_VERSION={major_version}"),
878            "fakecloud-mysql",
879        )
880        .await
881    }
882
883    async fn build_mariadb_image_local(
884        &self,
885        major_version: &str,
886        tag: &str,
887    ) -> Result<(), RuntimeError> {
888        let assets: [(&str, &str); 4] = [
889            ("Dockerfile", MARIADB_DOCKERFILE),
890            ("fakecloud_udf.c", MARIADB_UDF_C),
891            ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
892            ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
893        ];
894        self.build_image_local(
895            tag,
896            &assets,
897            &format!("MARIADB_VERSION={major_version}"),
898            "fakecloud-mariadb",
899        )
900        .await
901    }
902
903    async fn build_image_local(
904        &self,
905        tag: &str,
906        assets: &[(&str, &str)],
907        build_arg: &str,
908        image_label: &str,
909    ) -> Result<(), RuntimeError> {
910        let build_dir =
911            tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
912        for (name, contents) in assets {
913            tokio::fs::write(build_dir.path().join(name), contents)
914                .await
915                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
916        }
917
918        tracing::info!(
919            tag = %tag,
920            image = %image_label,
921            "Building {image_label} image locally (first use can take ~60s)"
922        );
923
924        let output = tokio::process::Command::new(&self.cli)
925            .args(["build", "--build-arg", build_arg, "-t", tag, "."])
926            .current_dir(build_dir.path())
927            .output()
928            .await
929            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
930
931        if !output.status.success() {
932            return Err(RuntimeError::ContainerStartFailed(format!(
933                "docker build for {} failed: {}",
934                tag,
935                String::from_utf8_lossy(&output.stderr).trim()
936            )));
937        }
938
939        Ok(())
940    }
941
942    pub async fn dump_database(
943        &self,
944        db_instance_identifier: &str,
945        engine: &str,
946        username: &str,
947        password: &str,
948        db_name: &str,
949    ) -> Result<Vec<u8>, RuntimeError> {
950        let container = self
951            .containers
952            .read()
953            .get(db_instance_identifier)
954            .cloned()
955            .ok_or(RuntimeError::Unavailable)?;
956
957        if let Some(k) = &self.k8s {
958            return k
959                .dump_database(&container.container_id, engine, username, password, db_name)
960                .await;
961        }
962
963        let args: Vec<String> = match engine {
964            "mysql" | "mariadb" => vec![
965                "exec".into(),
966                container.container_id.clone(),
967                "mysqldump".into(),
968                "-u".into(),
969                username.into(),
970                format!("-p{password}"),
971                db_name.into(),
972            ],
973            "postgres" => vec![
974                "exec".into(),
975                container.container_id.clone(),
976                "pg_dump".into(),
977                "-U".into(),
978                username.into(),
979                "-d".into(),
980                db_name.into(),
981                "--no-password".into(),
982            ],
983            // Heavy engines don't ship with a portable dump CLI we can
984            // shell out to from the host the same way pg_dump and
985            // mysqldump are guaranteed available. Surface a clear
986            // error so callers (snapshot/read-replica) don't silently
987            // run the wrong tool against an Oracle/SQL Server/Db2
988            // container.
989            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
990            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
991                return Err(RuntimeError::ContainerStartFailed(format!(
992                    "engine {engine} is not yet supported by the snapshot/read-replica path; \
993                     emulator stores the API state but cannot dump the underlying database"
994                )));
995            }
996            other => {
997                return Err(RuntimeError::ContainerStartFailed(format!(
998                    "engine {other} is not supported by dump_database"
999                )));
1000            }
1001        };
1002
1003        let output = tokio::process::Command::new(&self.cli)
1004            .args(&args)
1005            .output()
1006            .await
1007            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1008
1009        if !output.status.success() {
1010            return Err(RuntimeError::ContainerStartFailed(format!(
1011                "dump failed: {}",
1012                String::from_utf8_lossy(&output.stderr).trim()
1013            )));
1014        }
1015
1016        Ok(output.stdout)
1017    }
1018
1019    /// Cat a file inside the running container by absolute path.
1020    /// Returns `Unavailable` when the runtime/daemon is missing or the
1021    /// instance has no live container; returns `ContainerStartFailed`
1022    /// when the file is missing or unreadable. Callers map those to the
1023    /// AWS-shaped responses.
1024    pub async fn read_log_file(
1025        &self,
1026        db_instance_identifier: &str,
1027        container_path: &str,
1028    ) -> Result<Vec<u8>, RuntimeError> {
1029        let container = self
1030            .containers
1031            .read()
1032            .get(db_instance_identifier)
1033            .cloned()
1034            .ok_or(RuntimeError::Unavailable)?;
1035
1036        if let Some(k) = &self.k8s {
1037            return k.read_file(&container.container_id, container_path).await;
1038        }
1039
1040        let output = tokio::process::Command::new(&self.cli)
1041            .args(["exec", &container.container_id, "cat", container_path])
1042            .output()
1043            .await
1044            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1045
1046        if !output.status.success() {
1047            return Err(RuntimeError::ContainerStartFailed(format!(
1048                "cat {container_path} failed: {}",
1049                String::from_utf8_lossy(&output.stderr).trim()
1050            )));
1051        }
1052
1053        Ok(output.stdout)
1054    }
1055
1056    pub async fn restore_database(
1057        &self,
1058        db_instance_identifier: &str,
1059        engine: &str,
1060        username: &str,
1061        password: &str,
1062        db_name: &str,
1063        dump_data: &[u8],
1064    ) -> Result<(), RuntimeError> {
1065        let container = self
1066            .containers
1067            .read()
1068            .get(db_instance_identifier)
1069            .cloned()
1070            .ok_or(RuntimeError::Unavailable)?;
1071
1072        if let Some(k) = &self.k8s {
1073            return k
1074                .restore_database(
1075                    &container.container_id,
1076                    engine,
1077                    username,
1078                    password,
1079                    db_name,
1080                    dump_data,
1081                )
1082                .await;
1083        }
1084
1085        let args: Vec<String> = match engine {
1086            "mysql" | "mariadb" => vec![
1087                "exec".into(),
1088                "-i".into(),
1089                container.container_id.clone(),
1090                "mysql".into(),
1091                "-u".into(),
1092                username.into(),
1093                format!("-p{password}"),
1094                db_name.into(),
1095            ],
1096            "postgres" => vec![
1097                "exec".into(),
1098                "-i".into(),
1099                container.container_id.clone(),
1100                "psql".into(),
1101                "-U".into(),
1102                username.into(),
1103                "-d".into(),
1104                db_name.into(),
1105                "--no-password".into(),
1106                "-v".into(),
1107                "ON_ERROR_STOP=1".into(),
1108            ],
1109            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1110            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1111                return Err(RuntimeError::ContainerStartFailed(format!(
1112                    "engine {engine} is not yet supported by the snapshot-restore path"
1113                )));
1114            }
1115            other => {
1116                return Err(RuntimeError::ContainerStartFailed(format!(
1117                    "engine {other} is not supported by restore_database"
1118                )));
1119            }
1120        };
1121
1122        let mut child = tokio::process::Command::new(&self.cli)
1123            .args(&args)
1124            .stdin(std::process::Stdio::piped())
1125            .stdout(std::process::Stdio::piped())
1126            .stderr(std::process::Stdio::piped())
1127            .spawn()
1128            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1129
1130        if let Some(mut stdin) = child.stdin.take() {
1131            use tokio::io::AsyncWriteExt;
1132            stdin
1133                .write_all(dump_data)
1134                .await
1135                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1136            drop(stdin);
1137        }
1138
1139        let output = child
1140            .wait_with_output()
1141            .await
1142            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1143
1144        if !output.status.success() {
1145            return Err(RuntimeError::ContainerStartFailed(format!(
1146                "restore failed: {}",
1147                String::from_utf8_lossy(&output.stderr).trim()
1148            )));
1149        }
1150
1151        Ok(())
1152    }
1153}
1154
1155fn cli_available(cli: &str) -> bool {
1156    std::process::Command::new(cli)
1157        .arg("info")
1158        .stdout(std::process::Stdio::null())
1159        .stderr(std::process::Stdio::null())
1160        .status()
1161        .map(|status| status.success())
1162        .unwrap_or(false)
1163}
1164
1165/// On Linux Docker bridge networks, ask the daemon for the bridge
1166/// gateway IP so containers can reach the host's loopback. macOS and
1167/// Windows use the magic `host-gateway` alias instead.
1168fn detect_bridge_gateway(cli: &str) -> Option<String> {
1169    let output = std::process::Command::new(cli)
1170        .args([
1171            "network",
1172            "inspect",
1173            "bridge",
1174            "--format",
1175            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
1176        ])
1177        .output()
1178        .ok()?;
1179    if !output.status.success() {
1180        return None;
1181    }
1182    let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
1183    if gateway.is_empty() || !gateway.contains('.') {
1184        return None;
1185    }
1186    Some(gateway)
1187}
1188
1189/// Build the prebuilt-image reference for a given engine + major
1190/// version. Uses `<registry>/<image>:<major>-<fakecloud-version>`,
1191/// where the registry comes from `FAKECLOUD_POSTGRES_REGISTRY` (kept
1192/// historical name; defaults to the public `ghcr.io/faiscadev`).
1193/// The version pin guarantees the runtime asks the daemon for the
1194/// same image CI publishes for this fakecloud release; mismatched
1195/// assets force a local rebuild via the fall-through in
1196/// `ensure_bridge_image`.
1197pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1198    let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1199        .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1200    let registry = registry.trim_end_matches('/');
1201    format!(
1202        "{}/{}:{}-{}",
1203        registry,
1204        image,
1205        major_version,
1206        env!("CARGO_PKG_VERSION")
1207    )
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213
1214    /// Single test (rather than three) so the cases run sequentially —
1215    /// `bridge_image_tag` reads a process-global env var and parallel
1216    /// `cargo test` workers would race over it otherwise.
1217    #[test]
1218    fn bridge_image_tag_resolves_registry_overrides() {
1219        let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
1220
1221        std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
1222        assert_eq!(
1223            bridge_image_tag("fakecloud-postgres", "16"),
1224            format!(
1225                "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1226                env!("CARGO_PKG_VERSION")
1227            )
1228        );
1229        assert_eq!(
1230            bridge_image_tag("fakecloud-mysql", "8.0"),
1231            format!(
1232                "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1233                env!("CARGO_PKG_VERSION")
1234            )
1235        );
1236        assert_eq!(
1237            bridge_image_tag("fakecloud-mariadb", "10.11"),
1238            format!(
1239                "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1240                env!("CARGO_PKG_VERSION")
1241            )
1242        );
1243
1244        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
1245        assert_eq!(
1246            bridge_image_tag("fakecloud-postgres", "15"),
1247            format!(
1248                "registry.example.com/team/fakecloud-postgres:15-{}",
1249                env!("CARGO_PKG_VERSION")
1250            )
1251        );
1252
1253        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
1254        assert_eq!(
1255            bridge_image_tag("fakecloud-postgres", "13"),
1256            format!(
1257                "registry.example.com/team/fakecloud-postgres:13-{}",
1258                env!("CARGO_PKG_VERSION")
1259            )
1260        );
1261
1262        match prev {
1263            Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
1264            None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
1265        }
1266    }
1267}