Skip to main content

fakecloud_rds/
runtime.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use parking_lot::RwLock;
6use tokio_postgres::NoTls;
7
8const POSTGRES_DOCKERFILE: &str = include_str!("../assets/postgres/Dockerfile");
9const AWS_COMMONS_CONTROL: &str = include_str!("../assets/postgres/aws_commons.control");
10const AWS_COMMONS_SQL: &str = include_str!("../assets/postgres/aws_commons--1.0.sql");
11const AWS_LAMBDA_CONTROL: &str = include_str!("../assets/postgres/aws_lambda.control");
12const AWS_LAMBDA_SQL: &str = include_str!("../assets/postgres/aws_lambda--1.0.sql");
13
14/// Default registry that hosts the prebuilt postgres images. CI publishes
15/// to `ghcr.io/faiscadev/fakecloud-postgres:<major>-<version>` on each
16/// release tag (see `.github/workflows/docker-rds-images.yml`).
17/// Override with the `FAKECLOUD_POSTGRES_REGISTRY` env var (e.g. for
18/// private mirrors); set `FAKECLOUD_REBUILD_POSTGRES_IMAGE=1` to force
19/// a local rebuild even when the published tag is reachable.
20const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
21
22#[derive(Debug, Clone)]
23pub struct RunningDbContainer {
24    pub container_id: String,
25    pub host_port: u16,
26}
27
28pub struct RdsRuntime {
29    cli: String,
30    containers: RwLock<HashMap<String, RunningDbContainer>>,
31    instance_id: String,
32    host_ip: String,
33    server_port: u16,
34    image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
35}
36
37#[derive(Debug, thiserror::Error)]
38pub enum RuntimeError {
39    #[error("container runtime is unavailable")]
40    Unavailable,
41    #[error("container failed to start: {0}")]
42    ContainerStartFailed(String),
43}
44
45impl RdsRuntime {
46    pub fn new(server_port: u16) -> Option<Self> {
47        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
48            if cli_available(&cli) {
49                cli
50            } else {
51                return None;
52            }
53        } else if cli_available("docker") {
54            "docker".to_string()
55        } else if cli_available("podman") {
56            "podman".to_string()
57        } else {
58            return None;
59        };
60
61        // Match Lambda runtime container-to-host networking: Linux uses the
62        // bridge gateway IP directly, macOS/Windows use Docker Desktop's
63        // host-gateway alias. Containers reach fakecloud at host.docker.internal.
64        let host_ip = if cfg!(target_os = "linux") {
65            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
66        } else {
67            "host-gateway".to_string()
68        };
69
70        Some(Self {
71            cli,
72            containers: RwLock::new(HashMap::new()),
73            instance_id: format!("fakecloud-{}", std::process::id()),
74            host_ip,
75            server_port,
76            image_cache: RwLock::new(HashMap::new()),
77        })
78    }
79
80    pub fn cli_name(&self) -> &str {
81        &self.cli
82    }
83
84    #[allow(clippy::too_many_arguments)]
85    pub async fn ensure_postgres(
86        &self,
87        db_instance_identifier: &str,
88        engine: &str,
89        engine_version: &str,
90        username: &str,
91        password: &str,
92        db_name: &str,
93        account_id: &str,
94        region: &str,
95    ) -> Result<RunningDbContainer, RuntimeError> {
96        self.stop_container(db_instance_identifier).await;
97
98        // Determine Docker image and port based on engine. The postgres
99        // image is built locally (lazily) so we can ship the aws_lambda /
100        // aws_commons extensions plus plpython3u; other engines stay on
101        // upstream images.
102        let (image, port, env_vars, postgres_major) = match engine {
103            "postgres" => {
104                let major_version = engine_version.split('.').next().unwrap_or("16");
105                let image = self.ensure_postgres_image(major_version).await?;
106                let env_vars = vec![
107                    format!("POSTGRES_USER={username}"),
108                    format!("POSTGRES_PASSWORD={password}"),
109                    format!("POSTGRES_DB={db_name}"),
110                    format!(
111                        "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
112                        self.server_port
113                    ),
114                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
115                    format!("FAKECLOUD_REGION={region}"),
116                ];
117                (image, "5432", env_vars, Some(major_version.to_string()))
118            }
119            "mysql" => {
120                let major_version = if engine_version.starts_with("5.7") {
121                    "5.7"
122                } else {
123                    "8.0"
124                };
125                let image = format!("mysql:{}", major_version);
126                let env_vars = vec![
127                    format!("MYSQL_ROOT_PASSWORD={password}"),
128                    format!("MYSQL_USER={username}"),
129                    format!("MYSQL_PASSWORD={password}"),
130                    format!("MYSQL_DATABASE={db_name}"),
131                ];
132                (image, "3306", env_vars, None)
133            }
134            "mariadb" => {
135                let major_version = if engine_version.starts_with("10.11") {
136                    "10.11"
137                } else {
138                    "10.6"
139                };
140                let image = format!("mariadb:{}", major_version);
141                let env_vars = vec![
142                    format!("MARIADB_ROOT_PASSWORD={password}"),
143                    format!("MARIADB_USER={username}"),
144                    format!("MARIADB_PASSWORD={password}"),
145                    format!("MARIADB_DATABASE={db_name}"),
146                ];
147                (image, "3306", env_vars, None)
148            }
149            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
150                // Oracle Database Free is the no-cost dev edition shipped by
151                // Oracle. The container exposes a "FREEPDB1" pluggable
152                // database and creates the SYSTEM user with the password
153                // from ORACLE_PASSWORD.
154                let image = "gvenzl/oracle-free:23-slim".to_string();
155                let env_vars = vec![
156                    format!("ORACLE_PASSWORD={password}"),
157                    format!("APP_USER={username}"),
158                    format!("APP_USER_PASSWORD={password}"),
159                    format!("ORACLE_DATABASE={db_name}"),
160                ];
161                (image, "1521", env_vars, None)
162            }
163            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
164                // SQL Server Express is free for dev/test with no license
165                // ceiling. SA password must satisfy MSSQL's complexity
166                // requirements (>=8 chars, mix of classes); callers should
167                // supply one that does or the container will refuse to
168                // start.
169                let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
170                let env_vars = vec![
171                    "ACCEPT_EULA=Y".to_string(),
172                    format!("MSSQL_SA_PASSWORD={password}"),
173                    "MSSQL_PID=Express".to_string(),
174                ];
175                (image, "1433", env_vars, None)
176            }
177            "db2-se" | "db2-ae" => {
178                // Db2 Community Edition is free under the standard IBM
179                // Community License. The container exposes a single
180                // database named after DBNAME, owned by db2inst1.
181                let image = "icr.io/db2_community/db2:latest".to_string();
182                let env_vars = vec![
183                    "LICENSE=accept".to_string(),
184                    "DB2INSTANCE=db2inst1".to_string(),
185                    format!("DB2INST1_PASSWORD={password}"),
186                    format!("DBNAME={db_name}"),
187                ];
188                (image, "50000", env_vars, None)
189            }
190            _ => {
191                return Err(RuntimeError::ContainerStartFailed(format!(
192                    "Unsupported engine: {}",
193                    engine
194                )))
195            }
196        };
197
198        // Db2 needs --privileged to set kernel parameters during startup.
199        let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
200
201        // Build container create args
202        let mut args = vec![
203            "create".to_string(),
204            "-p".to_string(),
205            format!(":{}", port),
206            "--label".to_string(),
207            format!("fakecloud-rds={db_instance_identifier}"),
208            "--label".to_string(),
209            format!("fakecloud-instance={}", self.instance_id),
210        ];
211
212        if needs_privileged {
213            args.push("--privileged".to_string());
214        }
215
216        // Postgres runs the aws_lambda extension which calls back into
217        // fakecloud over HTTP. Wire the bridge alias so plpython3u code
218        // can resolve host.docker.internal on every platform.
219        if postgres_major.is_some() {
220            args.push("--add-host".to_string());
221            args.push(format!("host.docker.internal:{}", self.host_ip));
222        }
223
224        for env_var in env_vars {
225            args.push("-e".to_string());
226            args.push(env_var);
227        }
228
229        args.push(image);
230
231        let output = tokio::process::Command::new(&self.cli)
232            .args(&args)
233            .output()
234            .await
235            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
236
237        if !output.status.success() {
238            return Err(RuntimeError::ContainerStartFailed(
239                String::from_utf8_lossy(&output.stderr).trim().to_string(),
240            ));
241        }
242
243        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
244        let start_result = tokio::process::Command::new(&self.cli)
245            .args(["start", &container_id])
246            .output()
247            .await
248            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
249
250        if !start_result.status.success() {
251            self.remove_container(&container_id).await;
252            return Err(RuntimeError::ContainerStartFailed(format!(
253                "container start failed: {}",
254                String::from_utf8_lossy(&start_result.stderr).trim()
255            )));
256        }
257
258        let host_port = match self.lookup_port(&container_id, port).await {
259            Ok(host_port) => host_port,
260            Err(error) => {
261                self.remove_container(&container_id).await;
262                return Err(error);
263            }
264        };
265
266        // Wait for database to be ready
267        let wait_result = match engine {
268            "postgres" => {
269                self.wait_for_postgres(username, password, db_name, host_port)
270                    .await
271            }
272            "mysql" | "mariadb" => {
273                self.wait_for_mysql(username, password, db_name, host_port)
274                    .await
275            }
276            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
277                self.wait_for_oracle(&container_id, host_port).await
278            }
279            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
280                self.wait_for_sqlserver(&container_id, host_port).await
281            }
282            "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
283            _ => unreachable!("engine already validated"),
284        };
285
286        if let Err(error) = wait_result {
287            self.remove_container(&container_id).await;
288            return Err(error);
289        }
290
291        let running = RunningDbContainer {
292            container_id,
293            host_port,
294        };
295        self.containers
296            .write()
297            .insert(db_instance_identifier.to_string(), running.clone());
298        Ok(running)
299    }
300
301    pub async fn stop_container(&self, db_instance_identifier: &str) {
302        let container = self.containers.write().remove(db_instance_identifier);
303        if let Some(container) = container {
304            self.remove_container(&container.container_id).await;
305        }
306    }
307
308    pub async fn restart_container(
309        &self,
310        db_instance_identifier: &str,
311        engine: &str,
312        username: &str,
313        password: &str,
314        db_name: &str,
315    ) -> Result<RunningDbContainer, RuntimeError> {
316        let running = self
317            .containers
318            .read()
319            .get(db_instance_identifier)
320            .cloned()
321            .ok_or(RuntimeError::Unavailable)?;
322
323        let output = tokio::process::Command::new(&self.cli)
324            .args(["restart", &running.container_id])
325            .output()
326            .await
327            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
328
329        if !output.status.success() {
330            return Err(RuntimeError::ContainerStartFailed(format!(
331                "container restart failed: {}",
332                String::from_utf8_lossy(&output.stderr).trim()
333            )));
334        }
335
336        let port = match engine {
337            "postgres" => "5432",
338            "mysql" | "mariadb" => "3306",
339            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
340            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
341            "db2-se" | "db2-ae" => "50000",
342            _ => "5432", // fallback
343        };
344
345        let host_port = self.lookup_port(&running.container_id, port).await?;
346
347        match engine {
348            "postgres" => {
349                self.wait_for_postgres(username, password, db_name, host_port)
350                    .await?
351            }
352            "mysql" | "mariadb" => {
353                self.wait_for_mysql(username, password, db_name, host_port)
354                    .await?
355            }
356            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
357                self.wait_for_oracle(&running.container_id, host_port)
358                    .await?
359            }
360            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
361                self.wait_for_sqlserver(&running.container_id, host_port)
362                    .await?
363            }
364            "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
365            _ => {
366                self.wait_for_postgres(username, password, db_name, host_port)
367                    .await?
368            }
369        };
370        let running = RunningDbContainer {
371            container_id: running.container_id,
372            host_port,
373        };
374        self.containers
375            .write()
376            .insert(db_instance_identifier.to_string(), running.clone());
377        Ok(running)
378    }
379
380    pub async fn stop_all(&self) {
381        let containers: Vec<String> = {
382            let mut containers = self.containers.write();
383            containers
384                .drain()
385                .map(|(_, container)| container.container_id)
386                .collect()
387        };
388        for container_id in containers {
389            self.remove_container(&container_id).await;
390        }
391    }
392
393    async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
394        let port_output = tokio::process::Command::new(&self.cli)
395            .args(["port", container_id, port])
396            .output()
397            .await
398            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
399
400        let port_str = String::from_utf8_lossy(&port_output.stdout);
401        port_str
402            .trim()
403            .rsplit(':')
404            .next()
405            .and_then(|value| value.parse::<u16>().ok())
406            .ok_or_else(|| {
407                RuntimeError::ContainerStartFailed(format!(
408                    "could not determine container port from '{}'",
409                    port_str.trim()
410                ))
411            })
412    }
413
414    async fn wait_for_postgres(
415        &self,
416        username: &str,
417        password: &str,
418        db_name: &str,
419        host_port: u16,
420    ) -> Result<(), RuntimeError> {
421        for _ in 0..40 {
422            tokio::time::sleep(Duration::from_millis(500)).await;
423            let connection_string = format!(
424                "host=127.0.0.1 port={host_port} user={username} password={password} dbname={db_name}"
425            );
426            if let Ok((client, connection)) =
427                tokio_postgres::connect(&connection_string, NoTls).await
428            {
429                tokio::spawn(async move {
430                    let _ = connection.await;
431                });
432                if client.simple_query("SELECT 1").await.is_ok() {
433                    return Ok(());
434                }
435            }
436        }
437
438        Err(RuntimeError::ContainerStartFailed(
439            "postgres container did not become ready within 20 seconds".to_string(),
440        ))
441    }
442
443    async fn wait_for_mysql(
444        &self,
445        username: &str,
446        password: &str,
447        db_name: &str,
448        host_port: u16,
449    ) -> Result<(), RuntimeError> {
450        use mysql_async::prelude::*;
451        use mysql_async::OptsBuilder;
452
453        for attempt in 1..=40 {
454            let opts = OptsBuilder::default()
455                .ip_or_hostname("127.0.0.1")
456                .tcp_port(host_port)
457                .user(Some(username))
458                .pass(Some(password))
459                .db_name(Some(db_name));
460
461            match mysql_async::Conn::new(opts).await {
462                Ok(mut conn) => {
463                    if conn.query_drop("SELECT 1").await.is_ok() {
464                        let _ = conn.disconnect().await;
465                        return Ok(());
466                    }
467                }
468                Err(_) => {
469                    if attempt < 40 {
470                        tokio::time::sleep(Duration::from_millis(500)).await;
471                    }
472                    continue;
473                }
474            }
475        }
476
477        Err(RuntimeError::ContainerStartFailed(
478            "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
479        ))
480    }
481
482    /// Wait for Oracle Database Free to finish bootstrapping. The
483    /// `gvenzl/oracle-free` image prints `DATABASE IS READY TO USE!`
484    /// to stdout once the listener accepts connections, so we poll
485    /// `docker logs` until that marker appears (or the deadline elapses).
486    /// Oracle XE/Free typically takes 30-90 seconds on first start.
487    async fn wait_for_oracle(
488        &self,
489        container_id: &str,
490        host_port: u16,
491    ) -> Result<(), RuntimeError> {
492        self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
493            .await?;
494        self.wait_for_tcp(host_port, 30).await
495    }
496
497    /// Wait for SQL Server to be ready. The official mssql/server image
498    /// emits `SQL Server is now ready for client connections.` once it
499    /// accepts TCP connections on 1433.
500    async fn wait_for_sqlserver(
501        &self,
502        container_id: &str,
503        host_port: u16,
504    ) -> Result<(), RuntimeError> {
505        self.wait_for_log_marker(
506            container_id,
507            "SQL Server is now ready for client connections",
508            180,
509        )
510        .await?;
511        self.wait_for_tcp(host_port, 30).await
512    }
513
514    /// Wait for Db2 Community Edition to finish setup. The
515    /// `icr.io/db2_community/db2` image prints `Setup has completed.`
516    /// once the instance is up and the database has been created.
517    async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
518        self.wait_for_log_marker(container_id, "Setup has completed", 360)
519            .await?;
520        self.wait_for_tcp(host_port, 60).await
521    }
522
523    /// Poll `docker logs <container>` until the supplied marker appears
524    /// in stdout or stderr. `deadline_secs` caps total wait.
525    async fn wait_for_log_marker(
526        &self,
527        container_id: &str,
528        marker: &str,
529        deadline_secs: u64,
530    ) -> Result<(), RuntimeError> {
531        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
532        while std::time::Instant::now() < deadline {
533            let output = tokio::process::Command::new(&self.cli)
534                .args(["logs", container_id])
535                .output()
536                .await
537                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
538            let stdout = String::from_utf8_lossy(&output.stdout);
539            let stderr = String::from_utf8_lossy(&output.stderr);
540            if stdout.contains(marker) || stderr.contains(marker) {
541                return Ok(());
542            }
543            tokio::time::sleep(Duration::from_secs(2)).await;
544        }
545        Err(RuntimeError::ContainerStartFailed(format!(
546            "container did not log '{}' within {} seconds",
547            marker, deadline_secs
548        )))
549    }
550
551    /// TCP-probe the host port until it accepts a connection or the
552    /// deadline elapses. Use after `wait_for_log_marker` since the
553    /// listener may bind a moment after the readiness log line.
554    async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
555        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
556        while std::time::Instant::now() < deadline {
557            if tokio::net::TcpStream::connect(("127.0.0.1", host_port))
558                .await
559                .is_ok()
560            {
561                return Ok(());
562            }
563            tokio::time::sleep(Duration::from_millis(500)).await;
564        }
565        Err(RuntimeError::ContainerStartFailed(format!(
566            "TCP probe to 127.0.0.1:{} did not succeed within {}s",
567            host_port, deadline_secs
568        )))
569    }
570
571    async fn remove_container(&self, container_id: &str) {
572        let _ = tokio::process::Command::new(&self.cli)
573            .args(["rm", "-f", container_id])
574            .output()
575            .await;
576    }
577
578    /// Build (or reuse) the fakecloud-postgres image for a given major
579    /// version. The image bakes plpython3u plus the aws_commons and
580    /// aws_lambda extension files so users can run
581    /// `CREATE EXTENSION aws_lambda CASCADE` inside any database.
582    /// Tag includes a content hash so changes to the embedded assets
583    /// invalidate the local cache automatically.
584    /// Resolve the postgres image tag for a given major version. Tries
585    /// (in order): in-process cache, `docker image inspect` for a copy
586    /// already on the daemon, `docker pull` of the prebuilt image
587    /// published by CI, and finally a local `docker build` from the
588    /// embedded Dockerfile + extension assets. The pull path is the
589    /// happy path for end users on tagged releases; the build path
590    /// covers dev / unreleased versions / airgapped setups.
591    ///
592    /// Honors:
593    /// - `FAKECLOUD_POSTGRES_REGISTRY` — registry prefix (default
594    ///   `ghcr.io/faiscadev`); useful for private mirrors.
595    /// - `FAKECLOUD_REBUILD_POSTGRES_IMAGE` — when set to a non-empty
596    ///   value, skip inspect + pull and force a fresh local build.
597    ///   Use after editing the embedded Dockerfile or extension SQL.
598    pub(crate) async fn ensure_postgres_image(
599        &self,
600        major_version: &str,
601    ) -> Result<String, RuntimeError> {
602        let tag = postgres_image_tag(major_version);
603
604        // Per-tag mutex so concurrent first-creates don't all shell out
605        // to docker. Inner bool tracks whether resolution has succeeded
606        // in this process (regardless of whether it landed via inspect,
607        // pull, or build).
608        let lock = {
609            let mut cache = self.image_cache.write();
610            cache
611                .entry(tag.clone())
612                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
613                .clone()
614        };
615        let mut resolved = lock.lock().await;
616        if *resolved {
617            return Ok(tag);
618        }
619
620        let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
621            .map(|v| !v.is_empty())
622            .unwrap_or(false);
623
624        if !force_rebuild {
625            // Already on the daemon (prior pull, prior build, prior
626            // session)? Use it as-is.
627            if self.docker_image_exists(&tag).await {
628                *resolved = true;
629                return Ok(tag);
630            }
631
632            // Try the prebuilt image published by CI. Any failure
633            // (404 for unreleased version, network error, auth) falls
634            // through to the local build branch.
635            if self.try_pull_image(&tag).await {
636                *resolved = true;
637                return Ok(tag);
638            }
639        }
640
641        self.build_postgres_image_local(major_version, &tag).await?;
642        *resolved = true;
643        Ok(tag)
644    }
645
646    async fn docker_image_exists(&self, tag: &str) -> bool {
647        tokio::process::Command::new(&self.cli)
648            .args(["image", "inspect", tag])
649            .stdout(std::process::Stdio::null())
650            .stderr(std::process::Stdio::null())
651            .status()
652            .await
653            .map(|status| status.success())
654            .unwrap_or(false)
655    }
656
657    async fn try_pull_image(&self, tag: &str) -> bool {
658        tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
659        let output = match tokio::process::Command::new(&self.cli)
660            .args(["pull", tag])
661            .output()
662            .await
663        {
664            Ok(output) => output,
665            Err(e) => {
666                tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
667                return false;
668            }
669        };
670        if output.status.success() {
671            return true;
672        }
673        tracing::info!(
674            tag = %tag,
675            stderr = %String::from_utf8_lossy(&output.stderr).trim(),
676            "Prebuilt postgres image not available, falling back to local build"
677        );
678        false
679    }
680
681    async fn build_postgres_image_local(
682        &self,
683        major_version: &str,
684        tag: &str,
685    ) -> Result<(), RuntimeError> {
686        let build_dir =
687            tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
688        let assets: [(&str, &str); 5] = [
689            ("Dockerfile", POSTGRES_DOCKERFILE),
690            ("aws_commons.control", AWS_COMMONS_CONTROL),
691            ("aws_commons--1.0.sql", AWS_COMMONS_SQL),
692            ("aws_lambda.control", AWS_LAMBDA_CONTROL),
693            ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
694        ];
695        for (name, contents) in assets {
696            tokio::fs::write(build_dir.path().join(name), contents)
697                .await
698                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
699        }
700
701        tracing::info!(
702            tag = %tag,
703            "Building fakecloud-postgres image locally (first use can take ~60s)"
704        );
705
706        let output = tokio::process::Command::new(&self.cli)
707            .args([
708                "build",
709                "--build-arg",
710                &format!("PG_VERSION={major_version}"),
711                "-t",
712                tag,
713                ".",
714            ])
715            .current_dir(build_dir.path())
716            .output()
717            .await
718            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
719
720        if !output.status.success() {
721            return Err(RuntimeError::ContainerStartFailed(format!(
722                "docker build for {} failed: {}",
723                tag,
724                String::from_utf8_lossy(&output.stderr).trim()
725            )));
726        }
727
728        Ok(())
729    }
730
731    pub async fn dump_database(
732        &self,
733        db_instance_identifier: &str,
734        engine: &str,
735        username: &str,
736        password: &str,
737        db_name: &str,
738    ) -> Result<Vec<u8>, RuntimeError> {
739        let container = self
740            .containers
741            .read()
742            .get(db_instance_identifier)
743            .cloned()
744            .ok_or(RuntimeError::Unavailable)?;
745
746        let args: Vec<String> = match engine {
747            "mysql" | "mariadb" => vec![
748                "exec".into(),
749                container.container_id.clone(),
750                "mysqldump".into(),
751                "-u".into(),
752                username.into(),
753                format!("-p{password}"),
754                db_name.into(),
755            ],
756            "postgres" => vec![
757                "exec".into(),
758                container.container_id.clone(),
759                "pg_dump".into(),
760                "-U".into(),
761                username.into(),
762                "-d".into(),
763                db_name.into(),
764                "--no-password".into(),
765            ],
766            // Heavy engines don't ship with a portable dump CLI we can
767            // shell out to from the host the same way pg_dump and
768            // mysqldump are guaranteed available. Surface a clear
769            // error so callers (snapshot/read-replica) don't silently
770            // run the wrong tool against an Oracle/SQL Server/Db2
771            // container.
772            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
773            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
774                return Err(RuntimeError::ContainerStartFailed(format!(
775                    "engine {engine} is not yet supported by the snapshot/read-replica path; \
776                     emulator stores the API state but cannot dump the underlying database"
777                )));
778            }
779            other => {
780                return Err(RuntimeError::ContainerStartFailed(format!(
781                    "engine {other} is not supported by dump_database"
782                )));
783            }
784        };
785
786        let output = tokio::process::Command::new(&self.cli)
787            .args(&args)
788            .output()
789            .await
790            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
791
792        if !output.status.success() {
793            return Err(RuntimeError::ContainerStartFailed(format!(
794                "dump failed: {}",
795                String::from_utf8_lossy(&output.stderr).trim()
796            )));
797        }
798
799        Ok(output.stdout)
800    }
801
802    pub async fn restore_database(
803        &self,
804        db_instance_identifier: &str,
805        engine: &str,
806        username: &str,
807        password: &str,
808        db_name: &str,
809        dump_data: &[u8],
810    ) -> Result<(), RuntimeError> {
811        let container = self
812            .containers
813            .read()
814            .get(db_instance_identifier)
815            .cloned()
816            .ok_or(RuntimeError::Unavailable)?;
817
818        let args: Vec<String> = match engine {
819            "mysql" | "mariadb" => vec![
820                "exec".into(),
821                "-i".into(),
822                container.container_id.clone(),
823                "mysql".into(),
824                "-u".into(),
825                username.into(),
826                format!("-p{password}"),
827                db_name.into(),
828            ],
829            "postgres" => vec![
830                "exec".into(),
831                "-i".into(),
832                container.container_id.clone(),
833                "psql".into(),
834                "-U".into(),
835                username.into(),
836                "-d".into(),
837                db_name.into(),
838                "--no-password".into(),
839                "-v".into(),
840                "ON_ERROR_STOP=1".into(),
841            ],
842            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
843            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
844                return Err(RuntimeError::ContainerStartFailed(format!(
845                    "engine {engine} is not yet supported by the snapshot-restore path"
846                )));
847            }
848            other => {
849                return Err(RuntimeError::ContainerStartFailed(format!(
850                    "engine {other} is not supported by restore_database"
851                )));
852            }
853        };
854
855        let mut child = tokio::process::Command::new(&self.cli)
856            .args(&args)
857            .stdin(std::process::Stdio::piped())
858            .stdout(std::process::Stdio::piped())
859            .stderr(std::process::Stdio::piped())
860            .spawn()
861            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
862
863        if let Some(mut stdin) = child.stdin.take() {
864            use tokio::io::AsyncWriteExt;
865            stdin
866                .write_all(dump_data)
867                .await
868                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
869            drop(stdin);
870        }
871
872        let output = child
873            .wait_with_output()
874            .await
875            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
876
877        if !output.status.success() {
878            return Err(RuntimeError::ContainerStartFailed(format!(
879                "restore failed: {}",
880                String::from_utf8_lossy(&output.stderr).trim()
881            )));
882        }
883
884        Ok(())
885    }
886}
887
888fn cli_available(cli: &str) -> bool {
889    std::process::Command::new(cli)
890        .arg("info")
891        .stdout(std::process::Stdio::null())
892        .stderr(std::process::Stdio::null())
893        .status()
894        .map(|status| status.success())
895        .unwrap_or(false)
896}
897
898/// On Linux Docker bridge networks, ask the daemon for the bridge
899/// gateway IP so containers can reach the host's loopback. macOS and
900/// Windows use the magic `host-gateway` alias instead.
901fn detect_bridge_gateway(cli: &str) -> Option<String> {
902    let output = std::process::Command::new(cli)
903        .args([
904            "network",
905            "inspect",
906            "bridge",
907            "--format",
908            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
909        ])
910        .output()
911        .ok()?;
912    if !output.status.success() {
913        return None;
914    }
915    let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
916    if gateway.is_empty() || !gateway.contains('.') {
917        return None;
918    }
919    Some(gateway)
920}
921
922/// Build the postgres image reference for a given major version. Uses
923/// `<registry>/fakecloud-postgres:<major>-<fakecloud-version>`, where
924/// the registry comes from `FAKECLOUD_POSTGRES_REGISTRY` (defaults to
925/// the public `ghcr.io/faiscadev`). The version pin guarantees the
926/// runtime asks the daemon for the same image CI publishes for this
927/// fakecloud release; mismatched assets force a local rebuild via
928/// the fall-through in `ensure_postgres_image`.
929fn postgres_image_tag(major_version: &str) -> String {
930    let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
931        .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
932    let registry = registry.trim_end_matches('/');
933    format!(
934        "{}/fakecloud-postgres:{}-{}",
935        registry,
936        major_version,
937        env!("CARGO_PKG_VERSION")
938    )
939}
940
941#[cfg(test)]
942mod tests {
943    use super::*;
944
945    /// Single test (rather than three) so the cases run sequentially —
946    /// `postgres_image_tag` reads a process-global env var and parallel
947    /// `cargo test` workers would race over it otherwise.
948    #[test]
949    fn postgres_image_tag_resolves_registry_overrides() {
950        let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
951
952        std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
953        assert_eq!(
954            postgres_image_tag("16"),
955            format!(
956                "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
957                env!("CARGO_PKG_VERSION")
958            )
959        );
960
961        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
962        assert_eq!(
963            postgres_image_tag("15"),
964            format!(
965                "registry.example.com/team/fakecloud-postgres:15-{}",
966                env!("CARGO_PKG_VERSION")
967            )
968        );
969
970        std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
971        assert_eq!(
972            postgres_image_tag("13"),
973            format!(
974                "registry.example.com/team/fakecloud-postgres:13-{}",
975                env!("CARGO_PKG_VERSION")
976            )
977        );
978
979        match prev {
980            Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
981            None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
982        }
983    }
984}