Skip to main content

fakecloud_rds/runtime/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fakecloud_core::container_net::{detect_container_cli, HostNetworking};
6use parking_lot::RwLock;
7use tokio_postgres::NoTls;
8
9mod k8s;
10
11const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
12const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
13const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
14const AWS_COMMONS_UPGRADE_SQL: &str =
15    include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
16const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
17const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
18const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
19const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
20
21const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
22const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
23const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
24const MYSQL_BOOTSTRAP_SQL: &str =
25    include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
26
27const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
28const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
29const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
30const MARIADB_BOOTSTRAP_SQL: &str =
31    include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
32
33/// Default registry that hosts the prebuilt postgres images. CI publishes
34/// to `ghcr.io/faiscadev/fakecloud-postgres:<major>-<version>` on each
35/// release tag (see `.github/workflows/docker-rds-images.yml`).
36/// Override with the `FAKECLOUD_POSTGRES_REGISTRY` env var (e.g. for
37/// private mirrors); set `FAKECLOUD_REBUILD_POSTGRES_IMAGE=1` to force
38/// a local rebuild even when the published tag is reachable.
39const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
40
41#[derive(Debug, Clone)]
42pub struct RunningDbContainer {
43    /// Backend-specific handle: a Docker container id, or a Pod name.
44    pub container_id: String,
45    /// Host-published port (Docker) or the engine's in-Pod port (k8s).
46    pub host_port: u16,
47    /// Address clients (and fakecloud's own readiness/restore) connect to:
48    /// `127.0.0.1` for Docker, the Pod IP for k8s.
49    pub endpoint_address: String,
50    /// Port clients connect to: the published host port for Docker, the
51    /// engine's standard port for k8s.
52    pub endpoint_port: u16,
53}
54
55pub struct RdsRuntime {
56    cli: String,
57    containers: RwLock<HashMap<String, RunningDbContainer>>,
58    instance_id: String,
59    /// Container-to-host networking resolved from the shared
60    /// `fakecloud-core::container_net` helper (issue #1539): the host alias
61    /// the spawned DB container uses to reach fakecloud, the `--add-host`
62    /// flag (None under podman), and the sibling address fakecloud uses to
63    /// reach the DB it just spawned. Unused on the k8s backend.
64    net: HostNetworking,
65    server_port: u16,
66    image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
67    /// `Some` when running on the Kubernetes backend; the public methods
68    /// dispatch to it and the Docker fields above go unused. `None` is the
69    /// default Docker/Podman backend.
70    k8s: Option<k8s::K8sDb>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum RuntimeError {
75    #[error("container runtime is unavailable")]
76    Unavailable,
77    #[error("container failed to start: {0}")]
78    ContainerStartFailed(String),
79}
80
81/// Error initializing the Kubernetes backend at startup. Surfaced to the
82/// operator so a misconfigured cluster fails fast rather than silently
83/// falling back to Docker.
84#[derive(Debug, thiserror::Error)]
85pub enum BackendInitError {
86    #[error(transparent)]
87    Env(#[from] fakecloud_k8s::K8sEnvError),
88    #[error(transparent)]
89    PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
90    #[error("failed to connect to the Kubernetes cluster: {0}")]
91    Connect(String),
92}
93
94impl RdsRuntime {
95    pub fn new(server_port: u16) -> Option<Self> {
96        // CLI detection, host alias, `--add-host` injection, and the
97        // in-container sibling address all come from the shared
98        // `container_net` helper so RDS can't drift from Lambda/ECS/
99        // ElastiCache on the issue #1539 fix (podman gets no `--add-host`
100        // and the `host.containers.internal` alias; macOS docker gets
101        // `host-gateway`; Linux docker gets the resolved bridge IP).
102        let cli = detect_container_cli()?;
103        let net = HostNetworking::detect(&cli);
104
105        Some(Self {
106            cli,
107            containers: RwLock::new(HashMap::new()),
108            instance_id: format!("fakecloud-{}", std::process::id()),
109            net,
110            server_port,
111            image_cache: RwLock::new(HashMap::new()),
112            k8s: None,
113        })
114    }
115
116    /// Construct the Kubernetes backend. `server_port` is fakecloud's
117    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
118    /// fast on misconfiguration — never silently degrades to Docker.
119    pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
120        let db = k8s::K8sDb::from_env(server_port).await?;
121        Ok(Self {
122            cli: String::new(),
123            containers: RwLock::new(HashMap::new()),
124            instance_id: format!("fakecloud-{}", std::process::id()),
125            // Docker networking is unused on the k8s backend (Pod addresses
126            // come from `K8sDb`); a host-loopback default keeps the field
127            // populated without affecting behavior.
128            net: HostNetworking {
129                host_alias: String::new(),
130                add_host_arg: None,
131                sibling_host: "127.0.0.1".to_string(),
132            },
133            server_port,
134            image_cache: RwLock::new(HashMap::new()),
135            k8s: Some(db),
136        })
137    }
138
139    pub fn cli_name(&self) -> &str {
140        if self.k8s.is_some() {
141            "kubernetes"
142        } else {
143            &self.cli
144        }
145    }
146
147    /// Test-only runtime that needs no container daemon. Handler tests that
148    /// only assert the synchronously-stored "creating" placeholder (e.g.
149    /// create-time tag persistence) just need `require_runtime` to return
150    /// `Some`; the async container-start that follows fails harmlessly in
151    /// the background and never touches the asserted fields.
152    #[cfg(test)]
153    pub(crate) fn new_stub() -> Self {
154        Self {
155            cli: "true".to_string(),
156            containers: RwLock::new(HashMap::new()),
157            instance_id: format!("fakecloud-{}", std::process::id()),
158            net: HostNetworking {
159                host_alias: String::new(),
160                add_host_arg: None,
161                sibling_host: "127.0.0.1".to_string(),
162            },
163            server_port: 0,
164            image_cache: RwLock::new(HashMap::new()),
165            k8s: None,
166        }
167    }
168
169    /// Sweep DB Pods orphaned by a previous process (k8s only; no-op on
170    /// the Docker backend, which the shared container reaper handles).
171    pub async fn reap_stale(&self) {
172        if let Some(k) = &self.k8s {
173            k.reap_stale().await;
174        }
175    }
176
177    #[allow(clippy::too_many_arguments)]
178    pub async fn ensure_postgres(
179        &self,
180        db_instance_identifier: &str,
181        engine: &str,
182        engine_version: &str,
183        username: &str,
184        password: &str,
185        db_name: &str,
186        account_id: &str,
187        region: &str,
188        tags: &[crate::state::RdsTag],
189    ) -> Result<RunningDbContainer, RuntimeError> {
190        if let Some(k) = &self.k8s {
191            // Per-instance Pod scheduling overrides come from the
192            // resource's reserved `fakecloud-k8s/*` tags. Ignored on the
193            // Docker backend below.
194            let tag_map: std::collections::BTreeMap<String, String> = tags
195                .iter()
196                .map(|t| (t.key.clone(), t.value.clone()))
197                .collect();
198            let running = k
199                .ensure(
200                    db_instance_identifier,
201                    engine,
202                    engine_version,
203                    username,
204                    password,
205                    db_name,
206                    account_id,
207                    region,
208                    &tag_map,
209                )
210                .await?;
211            self.containers
212                .write()
213                .insert(db_instance_identifier.to_string(), running.clone());
214            return Ok(running);
215        }
216        self.stop_container(db_instance_identifier).await;
217
218        // Determine Docker image and port based on engine. Postgres,
219        // MySQL, and MariaDB all use prebuilt fakecloud-* images that
220        // bake in the bridge UDFs / extensions and call back into the
221        // host fakecloud server; the heavier engines (oracle/mssql/db2)
222        // stay on upstream images. `bridge_engine_version` is `Some(_)`
223        // for the bridge-aware engines and gates the `--add-host`
224        // setup below.
225        let (image, port, env_vars, bridge_engine_version) = match engine {
226            "postgres" => {
227                let major_version = engine_version.split('.').next().unwrap_or("16");
228                let image = self.ensure_postgres_image(major_version).await?;
229                let env_vars = vec![
230                    format!("POSTGRES_USER={username}"),
231                    format!("POSTGRES_PASSWORD={password}"),
232                    format!("POSTGRES_DB={db_name}"),
233                    format!(
234                        "FAKECLOUD_ENDPOINT=http://{}:{}",
235                        self.net.host_alias, self.server_port
236                    ),
237                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
238                    format!("FAKECLOUD_REGION={region}"),
239                ];
240                (image, "5432", env_vars, Some(major_version.to_string()))
241            }
242            "mysql" => {
243                // 5.7 was dropped after Oracle community support ended
244                // (Oct 2023) — the image base no longer ships the build
245                // deps we need for the UDF. Any 5.7.* engine version
246                // resolves to 8.0.
247                let _ = engine_version;
248                let major_version = "8.0";
249                let image = self.ensure_mysql_image(major_version).await?;
250                let env_vars = vec![
251                    format!("MYSQL_ROOT_PASSWORD={password}"),
252                    format!("MYSQL_USER={username}"),
253                    format!("MYSQL_PASSWORD={password}"),
254                    format!("MYSQL_DATABASE={db_name}"),
255                    format!(
256                        "FAKECLOUD_ENDPOINT=http://{}:{}",
257                        self.net.host_alias, self.server_port
258                    ),
259                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
260                    format!("FAKECLOUD_REGION={region}"),
261                ];
262                (image, "3306", env_vars, Some(major_version.to_string()))
263            }
264            "mariadb" => {
265                let major_version = if engine_version.starts_with("10.11") {
266                    "10.11"
267                } else if engine_version.starts_with("11.4") {
268                    "11.4"
269                } else {
270                    "10.6"
271                };
272                let image = self.ensure_mariadb_image(major_version).await?;
273                let env_vars = vec![
274                    format!("MARIADB_ROOT_PASSWORD={password}"),
275                    format!("MARIADB_USER={username}"),
276                    format!("MARIADB_PASSWORD={password}"),
277                    format!("MARIADB_DATABASE={db_name}"),
278                    format!(
279                        "FAKECLOUD_ENDPOINT=http://{}:{}",
280                        self.net.host_alias, self.server_port
281                    ),
282                    format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
283                    format!("FAKECLOUD_REGION={region}"),
284                ];
285                (image, "3306", env_vars, Some(major_version.to_string()))
286            }
287            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
288                // Oracle Database Free is the no-cost dev edition shipped by
289                // Oracle. The container exposes a "FREEPDB1" pluggable
290                // database and creates the SYSTEM user with the password
291                // from ORACLE_PASSWORD.
292                let image = "gvenzl/oracle-free:23-slim".to_string();
293                let env_vars = vec![
294                    format!("ORACLE_PASSWORD={password}"),
295                    format!("APP_USER={username}"),
296                    format!("APP_USER_PASSWORD={password}"),
297                    format!("ORACLE_DATABASE={db_name}"),
298                ];
299                (image, "1521", env_vars, None)
300            }
301            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
302                // SQL Server Express is free for dev/test with no license
303                // ceiling. SA password must satisfy MSSQL's complexity
304                // requirements (>=8 chars, mix of classes); callers should
305                // supply one that does or the container will refuse to
306                // start.
307                let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
308                let env_vars = vec![
309                    "ACCEPT_EULA=Y".to_string(),
310                    format!("MSSQL_SA_PASSWORD={password}"),
311                    "MSSQL_PID=Express".to_string(),
312                ];
313                (image, "1433", env_vars, None)
314            }
315            "db2-se" | "db2-ae" => {
316                // Db2 Community Edition is free under the standard IBM
317                // Community License. The container exposes a single
318                // database named after DBNAME, owned by db2inst1.
319                let image = "icr.io/db2_community/db2:latest".to_string();
320                let env_vars = vec![
321                    "LICENSE=accept".to_string(),
322                    "DB2INSTANCE=db2inst1".to_string(),
323                    format!("DB2INST1_PASSWORD={password}"),
324                    format!("DBNAME={db_name}"),
325                ];
326                (image, "50000", env_vars, None)
327            }
328            _ => {
329                return Err(RuntimeError::ContainerStartFailed(format!(
330                    "Unsupported engine: {}",
331                    engine
332                )))
333            }
334        };
335
336        // Db2 needs --privileged to set kernel parameters during startup.
337        let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
338
339        // Build container create args
340        let mut args = vec![
341            "create".to_string(),
342            "-p".to_string(),
343            format!(":{}", port),
344            "--label".to_string(),
345            format!("fakecloud-rds={db_instance_identifier}"),
346            "--label".to_string(),
347            format!("fakecloud-instance={}", self.instance_id),
348        ];
349
350        if needs_privileged {
351            args.push("--privileged".to_string());
352        }
353
354        // Bridge-aware engines (postgres aws_lambda, mysql/mariadb
355        // fakecloud_post UDF) call back into fakecloud over HTTP. Inject the
356        // host-gateway `--add-host` mapping so the in-container code can
357        // resolve the host alias. No-op under podman, which provides
358        // `host.containers.internal` natively — passing `host-gateway` there
359        // fails with "host containers internal IP address is empty" (#1539).
360        if bridge_engine_version.is_some() {
361            self.net.push_add_host_args(&mut args);
362        }
363
364        for env_var in env_vars {
365            args.push("-e".to_string());
366            args.push(env_var);
367        }
368
369        args.push(image);
370
371        let output = tokio::process::Command::new(&self.cli)
372            .args(&args)
373            .output()
374            .await
375            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
376
377        if !output.status.success() {
378            return Err(RuntimeError::ContainerStartFailed(
379                String::from_utf8_lossy(&output.stderr).trim().to_string(),
380            ));
381        }
382
383        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
384        let start_result = tokio::process::Command::new(&self.cli)
385            .args(["start", &container_id])
386            .output()
387            .await
388            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
389
390        if !start_result.status.success() {
391            self.remove_container(&container_id).await;
392            return Err(RuntimeError::ContainerStartFailed(format!(
393                "container start failed: {}",
394                String::from_utf8_lossy(&start_result.stderr).trim()
395            )));
396        }
397
398        let host_port = match self.lookup_port(&container_id, port).await {
399            Ok(host_port) => host_port,
400            Err(error) => {
401                self.remove_container(&container_id).await;
402                return Err(error);
403            }
404        };
405
406        // Wait for database to be ready
407        let wait_result = match engine {
408            "postgres" => {
409                self.wait_for_postgres(username, password, db_name, host_port)
410                    .await
411            }
412            "mysql" | "mariadb" => {
413                self.wait_for_mysql(username, password, db_name, host_port)
414                    .await
415            }
416            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
417                self.wait_for_oracle(&container_id, host_port).await
418            }
419            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
420                self.wait_for_sqlserver(&container_id, host_port).await
421            }
422            "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
423            _ => unreachable!("engine already validated"),
424        };
425
426        if let Err(error) = wait_result {
427            self.remove_container(&container_id).await;
428            return Err(error);
429        }
430
431        let running = RunningDbContainer {
432            container_id,
433            host_port,
434            // 127.0.0.1 on the host; host.docker.internal /
435            // host.containers.internal when fakecloud is itself
436            // containerized (issue #1539).
437            endpoint_address: self.net.sibling_host.clone(),
438            endpoint_port: host_port,
439        };
440        self.containers
441            .write()
442            .insert(db_instance_identifier.to_string(), running.clone());
443        Ok(running)
444    }
445
446    pub async fn stop_container(&self, db_instance_identifier: &str) {
447        let container = self.containers.write().remove(db_instance_identifier);
448        if let Some(container) = container {
449            if let Some(k) = &self.k8s {
450                k.delete_pod(&container.container_id).await;
451            } else {
452                self.remove_container(&container.container_id).await;
453            }
454        }
455    }
456
457    pub async fn restart_container(
458        &self,
459        db_instance_identifier: &str,
460        engine: &str,
461        username: &str,
462        password: &str,
463        db_name: &str,
464    ) -> Result<RunningDbContainer, RuntimeError> {
465        if let Some(k) = &self.k8s {
466            let running = k
467                .restart(db_instance_identifier, engine, username, password, db_name)
468                .await?;
469            self.containers
470                .write()
471                .insert(db_instance_identifier.to_string(), running.clone());
472            return Ok(running);
473        }
474        let running = self
475            .containers
476            .read()
477            .get(db_instance_identifier)
478            .cloned()
479            .ok_or(RuntimeError::Unavailable)?;
480
481        let output = tokio::process::Command::new(&self.cli)
482            .args(["restart", &running.container_id])
483            .output()
484            .await
485            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
486
487        if !output.status.success() {
488            return Err(RuntimeError::ContainerStartFailed(format!(
489                "container restart failed: {}",
490                String::from_utf8_lossy(&output.stderr).trim()
491            )));
492        }
493
494        let port = match engine {
495            "postgres" => "5432",
496            "mysql" | "mariadb" => "3306",
497            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
498            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
499            "db2-se" | "db2-ae" => "50000",
500            _ => "5432", // fallback
501        };
502
503        let host_port = self.lookup_port(&running.container_id, port).await?;
504
505        match engine {
506            "postgres" => {
507                self.wait_for_postgres(username, password, db_name, host_port)
508                    .await?
509            }
510            "mysql" | "mariadb" => {
511                self.wait_for_mysql(username, password, db_name, host_port)
512                    .await?
513            }
514            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
515                self.wait_for_oracle(&running.container_id, host_port)
516                    .await?
517            }
518            "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
519                self.wait_for_sqlserver(&running.container_id, host_port)
520                    .await?
521            }
522            "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
523            _ => {
524                self.wait_for_postgres(username, password, db_name, host_port)
525                    .await?
526            }
527        };
528        let running = RunningDbContainer {
529            container_id: running.container_id,
530            host_port,
531            endpoint_address: self.net.sibling_host.clone(),
532            endpoint_port: host_port,
533        };
534        self.containers
535            .write()
536            .insert(db_instance_identifier.to_string(), running.clone());
537        Ok(running)
538    }
539
540    pub async fn stop_all(&self) {
541        let containers: Vec<String> = {
542            let mut containers = self.containers.write();
543            containers
544                .drain()
545                .map(|(_, container)| container.container_id)
546                .collect()
547        };
548        for container_id in containers {
549            self.remove_container(&container_id).await;
550        }
551    }
552
553    async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
554        let port_output = tokio::process::Command::new(&self.cli)
555            .args(["port", container_id, port])
556            .output()
557            .await
558            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
559
560        let port_str = String::from_utf8_lossy(&port_output.stdout);
561        port_str
562            .trim()
563            .rsplit(':')
564            .next()
565            .and_then(|value| value.parse::<u16>().ok())
566            .ok_or_else(|| {
567                RuntimeError::ContainerStartFailed(format!(
568                    "could not determine container port from '{}'",
569                    port_str.trim()
570                ))
571            })
572    }
573
574    async fn wait_for_postgres(
575        &self,
576        username: &str,
577        password: &str,
578        db_name: &str,
579        host_port: u16,
580    ) -> Result<(), RuntimeError> {
581        for _ in 0..40 {
582            tokio::time::sleep(Duration::from_millis(500)).await;
583            let connection_string = format!(
584                "host={} port={host_port} user={username} password={password} dbname={db_name}",
585                self.net.sibling_host
586            );
587            if let Ok((client, connection)) =
588                tokio_postgres::connect(&connection_string, NoTls).await
589            {
590                tokio::spawn(async move {
591                    let _ = connection.await;
592                });
593                if client.simple_query("SELECT 1").await.is_ok() {
594                    return Ok(());
595                }
596            }
597        }
598
599        Err(RuntimeError::ContainerStartFailed(
600            "postgres container did not become ready within 20 seconds".to_string(),
601        ))
602    }
603
604    async fn wait_for_mysql(
605        &self,
606        username: &str,
607        password: &str,
608        db_name: &str,
609        host_port: u16,
610    ) -> Result<(), RuntimeError> {
611        use mysql_async::prelude::*;
612        use mysql_async::OptsBuilder;
613
614        for attempt in 1..=40 {
615            let opts = OptsBuilder::default()
616                .ip_or_hostname(self.net.sibling_host.as_str())
617                .tcp_port(host_port)
618                .user(Some(username))
619                .pass(Some(password))
620                .db_name(Some(db_name));
621
622            match mysql_async::Conn::new(opts).await {
623                Ok(mut conn) => {
624                    if conn.query_drop("SELECT 1").await.is_ok() {
625                        let _ = conn.disconnect().await;
626                        return Ok(());
627                    }
628                }
629                Err(_) => {
630                    if attempt < 40 {
631                        tokio::time::sleep(Duration::from_millis(500)).await;
632                    }
633                    continue;
634                }
635            }
636        }
637
638        Err(RuntimeError::ContainerStartFailed(
639            "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
640        ))
641    }
642
643    /// Wait for Oracle Database Free to finish bootstrapping. The
644    /// `gvenzl/oracle-free` image prints `DATABASE IS READY TO USE!`
645    /// to stdout once the listener accepts connections, so we poll
646    /// `docker logs` until that marker appears (or the deadline elapses).
647    /// Oracle XE/Free typically takes 30-90 seconds on first start.
648    async fn wait_for_oracle(
649        &self,
650        container_id: &str,
651        host_port: u16,
652    ) -> Result<(), RuntimeError> {
653        self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
654            .await?;
655        self.wait_for_tcp(host_port, 30).await
656    }
657
658    /// Wait for SQL Server to be ready. The official mssql/server image
659    /// emits `SQL Server is now ready for client connections.` once it
660    /// accepts TCP connections on 1433.
661    async fn wait_for_sqlserver(
662        &self,
663        container_id: &str,
664        host_port: u16,
665    ) -> Result<(), RuntimeError> {
666        self.wait_for_log_marker(
667            container_id,
668            "SQL Server is now ready for client connections",
669            180,
670        )
671        .await?;
672        self.wait_for_tcp(host_port, 30).await
673    }
674
675    /// Wait for Db2 Community Edition to finish setup. The
676    /// `icr.io/db2_community/db2` image prints `Setup has completed.`
677    /// once the instance is up and the database has been created.
678    async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
679        self.wait_for_log_marker(container_id, "Setup has completed", 360)
680            .await?;
681        self.wait_for_tcp(host_port, 60).await
682    }
683
684    /// Poll `docker logs <container>` until the supplied marker appears
685    /// in stdout or stderr. `deadline_secs` caps total wait.
686    async fn wait_for_log_marker(
687        &self,
688        container_id: &str,
689        marker: &str,
690        deadline_secs: u64,
691    ) -> Result<(), RuntimeError> {
692        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
693        while std::time::Instant::now() < deadline {
694            let output = tokio::process::Command::new(&self.cli)
695                .args(["logs", container_id])
696                .output()
697                .await
698                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
699            let stdout = String::from_utf8_lossy(&output.stdout);
700            let stderr = String::from_utf8_lossy(&output.stderr);
701            if stdout.contains(marker) || stderr.contains(marker) {
702                return Ok(());
703            }
704            tokio::time::sleep(Duration::from_secs(2)).await;
705        }
706        Err(RuntimeError::ContainerStartFailed(format!(
707            "container did not log '{}' within {} seconds",
708            marker, deadline_secs
709        )))
710    }
711
712    /// TCP-probe the host port until it accepts a connection or the
713    /// deadline elapses. Use after `wait_for_log_marker` since the
714    /// listener may bind a moment after the readiness log line.
715    async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
716        let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
717        let host = self.net.sibling_host.as_str();
718        while std::time::Instant::now() < deadline {
719            if tokio::net::TcpStream::connect((host, host_port))
720                .await
721                .is_ok()
722            {
723                return Ok(());
724            }
725            tokio::time::sleep(Duration::from_millis(500)).await;
726        }
727        Err(RuntimeError::ContainerStartFailed(format!(
728            "TCP probe to {}:{} did not succeed within {}s",
729            host, host_port, deadline_secs
730        )))
731    }
732
733    async fn remove_container(&self, container_id: &str) {
734        let _ = tokio::process::Command::new(&self.cli)
735            .args(["rm", "-f", container_id])
736            .output()
737            .await;
738    }
739
740    /// Build (or reuse) the fakecloud-postgres image for a given major
741    /// version. The image bakes plpython3u plus the aws_commons and
742    /// aws_lambda extension files so users can run
743    /// `CREATE EXTENSION aws_lambda CASCADE` inside any database.
744    /// Tag includes a content hash so changes to the embedded assets
745    /// invalidate the local cache automatically.
746    /// Resolve the postgres image tag for a given major version. Tries
747    /// (in order): in-process cache, `docker image inspect` for a copy
748    /// already on the daemon, `docker pull` of the prebuilt image
749    /// published by CI, and finally a local `docker build` from the
750    /// embedded Dockerfile + extension assets. The pull path is the
751    /// happy path for end users on tagged releases; the build path
752    /// covers dev / unreleased versions / airgapped setups.
753    ///
754    /// Honors:
755    /// - `FAKECLOUD_POSTGRES_REGISTRY` — registry prefix (default
756    ///   `ghcr.io/faiscadev`); useful for private mirrors.
757    /// - `FAKECLOUD_REBUILD_POSTGRES_IMAGE` — when set to a non-empty
758    ///   value, skip inspect + pull and force a fresh local build.
759    ///   Use after editing the embedded Dockerfile or extension SQL.
760    pub(crate) async fn ensure_postgres_image(
761        &self,
762        major_version: &str,
763    ) -> Result<String, RuntimeError> {
764        let tag = bridge_image_tag("fakecloud-postgres", major_version);
765        self.ensure_bridge_image(&tag, |tag| async move {
766            self.build_postgres_image_local(major_version, &tag).await
767        })
768        .await
769    }
770
771    async fn docker_image_exists(&self, tag: &str) -> bool {
772        tokio::process::Command::new(&self.cli)
773            .args(["image", "inspect", tag])
774            .stdout(std::process::Stdio::null())
775            .stderr(std::process::Stdio::null())
776            .status()
777            .await
778            .map(|status| status.success())
779            .unwrap_or(false)
780    }
781
782    async fn try_pull_image(&self, tag: &str) -> bool {
783        tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
784        let output = match tokio::process::Command::new(&self.cli)
785            .args(["pull", tag])
786            .output()
787            .await
788        {
789            Ok(output) => output,
790            Err(e) => {
791                tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
792                return false;
793            }
794        };
795        if output.status.success() {
796            return true;
797        }
798        tracing::info!(
799            tag = %tag,
800            stderr = %String::from_utf8_lossy(&output.stderr).trim(),
801            "Prebuilt postgres image not available, falling back to local build"
802        );
803        false
804    }
805
806    async fn build_postgres_image_local(
807        &self,
808        major_version: &str,
809        tag: &str,
810    ) -> Result<(), RuntimeError> {
811        let assets: [(&str, &str); 8] = [
812            ("Dockerfile", POSTGRES_DOCKERFILE),
813            ("aws_commons.control", AWS_COMMONS_CONTROL),
814            ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
815            ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
816            ("aws_lambda.control", AWS_LAMBDA_CONTROL),
817            ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
818            ("aws_s3.control", AWS_S3_CONTROL),
819            ("aws_s3--1.0.sql", AWS_S3_SQL),
820        ];
821        self.build_image_local(
822            tag,
823            &assets,
824            &format!("PG_VERSION={major_version}"),
825            "fakecloud-postgres",
826        )
827        .await
828    }
829
830    /// Pull-first / build-fallback for the prebuilt fakecloud-mysql
831    /// image. Mirrors `ensure_postgres_image`. The image bakes a small
832    /// libcurl-backed UDF + Aurora-compatible `mysql.lambda_async` /
833    /// `mysql.lambda_sync` stored procedures.
834    pub(crate) async fn ensure_mysql_image(
835        &self,
836        major_version: &str,
837    ) -> Result<String, RuntimeError> {
838        let tag = bridge_image_tag("fakecloud-mysql", major_version);
839        self.ensure_bridge_image(&tag, |tag| async move {
840            self.build_mysql_image_local(major_version, &tag).await
841        })
842        .await
843    }
844
845    pub(crate) async fn ensure_mariadb_image(
846        &self,
847        major_version: &str,
848    ) -> Result<String, RuntimeError> {
849        let tag = bridge_image_tag("fakecloud-mariadb", major_version);
850        self.ensure_bridge_image(&tag, |tag| async move {
851            self.build_mariadb_image_local(major_version, &tag).await
852        })
853        .await
854    }
855
856    /// Shared pull-first/build-fallback orchestration used by every
857    /// bridge-aware engine. Holds the per-tag mutex, checks the local
858    /// daemon first, then tries the prebuilt image, and finally
859    /// invokes the supplied local-build closure.
860    async fn ensure_bridge_image<F, Fut>(
861        &self,
862        tag: &str,
863        build_local: F,
864    ) -> Result<String, RuntimeError>
865    where
866        F: FnOnce(String) -> Fut,
867        Fut: std::future::Future<Output = Result<(), RuntimeError>>,
868    {
869        let lock = {
870            let mut cache = self.image_cache.write();
871            cache
872                .entry(tag.to_string())
873                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
874                .clone()
875        };
876        let mut resolved = lock.lock().await;
877        if *resolved {
878            return Ok(tag.to_string());
879        }
880
881        let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
882            .map(|v| !v.is_empty())
883            .unwrap_or(false);
884
885        if !force_rebuild {
886            if self.docker_image_exists(tag).await {
887                *resolved = true;
888                return Ok(tag.to_string());
889            }
890            if self.try_pull_image(tag).await {
891                *resolved = true;
892                return Ok(tag.to_string());
893            }
894        }
895
896        build_local(tag.to_string()).await?;
897        *resolved = true;
898        Ok(tag.to_string())
899    }
900
901    async fn build_mysql_image_local(
902        &self,
903        major_version: &str,
904        tag: &str,
905    ) -> Result<(), RuntimeError> {
906        let assets: [(&str, &str); 4] = [
907            ("Dockerfile", MYSQL_DOCKERFILE),
908            ("fakecloud_udf.c", MYSQL_UDF_C),
909            ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
910            ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
911        ];
912        self.build_image_local(
913            tag,
914            &assets,
915            &format!("MYSQL_VERSION={major_version}"),
916            "fakecloud-mysql",
917        )
918        .await
919    }
920
921    async fn build_mariadb_image_local(
922        &self,
923        major_version: &str,
924        tag: &str,
925    ) -> Result<(), RuntimeError> {
926        let assets: [(&str, &str); 4] = [
927            ("Dockerfile", MARIADB_DOCKERFILE),
928            ("fakecloud_udf.c", MARIADB_UDF_C),
929            ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
930            ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
931        ];
932        self.build_image_local(
933            tag,
934            &assets,
935            &format!("MARIADB_VERSION={major_version}"),
936            "fakecloud-mariadb",
937        )
938        .await
939    }
940
941    async fn build_image_local(
942        &self,
943        tag: &str,
944        assets: &[(&str, &str)],
945        build_arg: &str,
946        image_label: &str,
947    ) -> Result<(), RuntimeError> {
948        let build_dir =
949            tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
950        for (name, contents) in assets {
951            tokio::fs::write(build_dir.path().join(name), contents)
952                .await
953                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
954        }
955
956        tracing::info!(
957            tag = %tag,
958            image = %image_label,
959            "Building {image_label} image locally (first use can take ~60s)"
960        );
961
962        let output = tokio::process::Command::new(&self.cli)
963            .args(["build", "--build-arg", build_arg, "-t", tag, "."])
964            .current_dir(build_dir.path())
965            .output()
966            .await
967            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
968
969        if !output.status.success() {
970            return Err(RuntimeError::ContainerStartFailed(format!(
971                "docker build for {} failed: {}",
972                tag,
973                String::from_utf8_lossy(&output.stderr).trim()
974            )));
975        }
976
977        Ok(())
978    }
979
980    pub async fn dump_database(
981        &self,
982        db_instance_identifier: &str,
983        engine: &str,
984        username: &str,
985        password: &str,
986        db_name: &str,
987    ) -> Result<Vec<u8>, RuntimeError> {
988        let container = self
989            .containers
990            .read()
991            .get(db_instance_identifier)
992            .cloned()
993            .ok_or(RuntimeError::Unavailable)?;
994
995        if let Some(k) = &self.k8s {
996            return k
997                .dump_database(&container.container_id, engine, username, password, db_name)
998                .await;
999        }
1000
1001        let args: Vec<String> = match engine {
1002            "mysql" | "mariadb" => vec![
1003                "exec".into(),
1004                container.container_id.clone(),
1005                "mysqldump".into(),
1006                "-u".into(),
1007                username.into(),
1008                format!("-p{password}"),
1009                db_name.into(),
1010            ],
1011            "postgres" => vec![
1012                "exec".into(),
1013                container.container_id.clone(),
1014                "pg_dump".into(),
1015                "-U".into(),
1016                username.into(),
1017                "-d".into(),
1018                db_name.into(),
1019                "--no-password".into(),
1020            ],
1021            // Heavy engines don't ship with a portable dump CLI we can
1022            // shell out to from the host the same way pg_dump and
1023            // mysqldump are guaranteed available. Surface a clear
1024            // error so callers (snapshot/read-replica) don't silently
1025            // run the wrong tool against an Oracle/SQL Server/Db2
1026            // container.
1027            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1028            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1029                return Err(RuntimeError::ContainerStartFailed(format!(
1030                    "engine {engine} is not yet supported by the snapshot/read-replica path; \
1031                     emulator stores the API state but cannot dump the underlying database"
1032                )));
1033            }
1034            other => {
1035                return Err(RuntimeError::ContainerStartFailed(format!(
1036                    "engine {other} is not supported by dump_database"
1037                )));
1038            }
1039        };
1040
1041        let output = tokio::process::Command::new(&self.cli)
1042            .args(&args)
1043            .output()
1044            .await
1045            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1046
1047        if !output.status.success() {
1048            return Err(RuntimeError::ContainerStartFailed(format!(
1049                "dump failed: {}",
1050                String::from_utf8_lossy(&output.stderr).trim()
1051            )));
1052        }
1053
1054        Ok(output.stdout)
1055    }
1056
1057    /// Cat a file inside the running container by absolute path.
1058    /// Returns `Unavailable` when the runtime/daemon is missing or the
1059    /// instance has no live container; returns `ContainerStartFailed`
1060    /// when the file is missing or unreadable. Callers map those to the
1061    /// AWS-shaped responses.
1062    pub async fn read_log_file(
1063        &self,
1064        db_instance_identifier: &str,
1065        container_path: &str,
1066    ) -> Result<Vec<u8>, RuntimeError> {
1067        let container = self
1068            .containers
1069            .read()
1070            .get(db_instance_identifier)
1071            .cloned()
1072            .ok_or(RuntimeError::Unavailable)?;
1073
1074        if let Some(k) = &self.k8s {
1075            return k.read_file(&container.container_id, container_path).await;
1076        }
1077
1078        let output = tokio::process::Command::new(&self.cli)
1079            .args(["exec", &container.container_id, "cat", container_path])
1080            .output()
1081            .await
1082            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1083
1084        if !output.status.success() {
1085            return Err(RuntimeError::ContainerStartFailed(format!(
1086                "cat {container_path} failed: {}",
1087                String::from_utf8_lossy(&output.stderr).trim()
1088            )));
1089        }
1090
1091        Ok(output.stdout)
1092    }
1093
1094    pub async fn restore_database(
1095        &self,
1096        db_instance_identifier: &str,
1097        engine: &str,
1098        username: &str,
1099        password: &str,
1100        db_name: &str,
1101        dump_data: &[u8],
1102    ) -> Result<(), RuntimeError> {
1103        let container = self
1104            .containers
1105            .read()
1106            .get(db_instance_identifier)
1107            .cloned()
1108            .ok_or(RuntimeError::Unavailable)?;
1109
1110        if let Some(k) = &self.k8s {
1111            return k
1112                .restore_database(
1113                    &container.container_id,
1114                    engine,
1115                    username,
1116                    password,
1117                    db_name,
1118                    dump_data,
1119                )
1120                .await;
1121        }
1122
1123        let args: Vec<String> = match engine {
1124            "mysql" | "mariadb" => vec![
1125                "exec".into(),
1126                "-i".into(),
1127                container.container_id.clone(),
1128                "mysql".into(),
1129                "-u".into(),
1130                username.into(),
1131                format!("-p{password}"),
1132                db_name.into(),
1133            ],
1134            "postgres" => vec![
1135                "exec".into(),
1136                "-i".into(),
1137                container.container_id.clone(),
1138                "psql".into(),
1139                "-U".into(),
1140                username.into(),
1141                "-d".into(),
1142                db_name.into(),
1143                "--no-password".into(),
1144                "-v".into(),
1145                "ON_ERROR_STOP=1".into(),
1146            ],
1147            "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1148            | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1149                return Err(RuntimeError::ContainerStartFailed(format!(
1150                    "engine {engine} is not yet supported by the snapshot-restore path"
1151                )));
1152            }
1153            other => {
1154                return Err(RuntimeError::ContainerStartFailed(format!(
1155                    "engine {other} is not supported by restore_database"
1156                )));
1157            }
1158        };
1159
1160        let mut child = tokio::process::Command::new(&self.cli)
1161            .args(&args)
1162            .stdin(std::process::Stdio::piped())
1163            .stdout(std::process::Stdio::piped())
1164            .stderr(std::process::Stdio::piped())
1165            .spawn()
1166            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1167
1168        if let Some(mut stdin) = child.stdin.take() {
1169            use tokio::io::AsyncWriteExt;
1170            stdin
1171                .write_all(dump_data)
1172                .await
1173                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1174            drop(stdin);
1175        }
1176
1177        let output = child
1178            .wait_with_output()
1179            .await
1180            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1181
1182        if !output.status.success() {
1183            return Err(RuntimeError::ContainerStartFailed(format!(
1184                "restore failed: {}",
1185                String::from_utf8_lossy(&output.stderr).trim()
1186            )));
1187        }
1188
1189        Ok(())
1190    }
1191}
1192
1193/// Build the prebuilt-image reference for a given engine + major
1194/// version. Uses `<registry>/<image>:<major>-<fakecloud-version>`,
1195/// where the registry comes from `FAKECLOUD_POSTGRES_REGISTRY` (kept
1196/// historical name; defaults to the public `ghcr.io/faiscadev`).
1197/// The version pin guarantees the runtime asks the daemon for the
1198/// same image CI publishes for this fakecloud release; mismatched
1199/// assets force a local rebuild via the fall-through in
1200/// `ensure_bridge_image`.
1201pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1202    let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1203        .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1204    bridge_image_tag_with_registry(&registry, image, major_version)
1205}
1206
1207/// Pure tag builder split out from [`bridge_image_tag`] so callers (and
1208/// tests) can supply the registry explicitly. Keeping the env read in the
1209/// thin wrapper means the formatting logic is testable without mutating the
1210/// process-global `FAKECLOUD_POSTGRES_REGISTRY`, which `cargo test`'s parallel
1211/// workers would otherwise race over.
1212fn bridge_image_tag_with_registry(registry: &str, image: &str, major_version: &str) -> String {
1213    let registry = registry.trim_end_matches('/');
1214    format!(
1215        "{}/{}:{}-{}",
1216        registry,
1217        image,
1218        major_version,
1219        env!("CARGO_PKG_VERSION")
1220    )
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225    use super::*;
1226
1227    /// Exercises the pure tag builder with explicit registries so the cases
1228    /// never touch the process-global `FAKECLOUD_POSTGRES_REGISTRY`. Parallel
1229    /// `cargo test` workers used to race over that env var, which surfaced as
1230    /// an override silently resolving to the default registry.
1231    #[test]
1232    fn bridge_image_tag_resolves_registry_overrides() {
1233        // Default registry (matches the env-less `bridge_image_tag` path).
1234        assert_eq!(
1235            bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-postgres", "16"),
1236            format!(
1237                "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1238                env!("CARGO_PKG_VERSION")
1239            )
1240        );
1241        assert_eq!(
1242            bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mysql", "8.0"),
1243            format!(
1244                "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1245                env!("CARGO_PKG_VERSION")
1246            )
1247        );
1248        assert_eq!(
1249            bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mariadb", "10.11"),
1250            format!(
1251                "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1252                env!("CARGO_PKG_VERSION")
1253            )
1254        );
1255
1256        // Custom registry override.
1257        assert_eq!(
1258            bridge_image_tag_with_registry("registry.example.com/team", "fakecloud-postgres", "15"),
1259            format!(
1260                "registry.example.com/team/fakecloud-postgres:15-{}",
1261                env!("CARGO_PKG_VERSION")
1262            )
1263        );
1264
1265        // Trailing slash on the override is trimmed.
1266        assert_eq!(
1267            bridge_image_tag_with_registry(
1268                "registry.example.com/team/",
1269                "fakecloud-postgres",
1270                "13"
1271            ),
1272            format!(
1273                "registry.example.com/team/fakecloud-postgres:13-{}",
1274                env!("CARGO_PKG_VERSION")
1275            )
1276        );
1277    }
1278
1279    fn running_stub(container_id: &str) -> RunningDbContainer {
1280        RunningDbContainer {
1281            container_id: container_id.to_string(),
1282            host_port: 54321,
1283            endpoint_address: "127.0.0.1".to_string(),
1284            endpoint_port: 54321,
1285        }
1286    }
1287
1288    /// 4.3 — the create-during-delete race. DeleteDBInstance calls
1289    /// `stop_container(id)` while `ensure_postgres` is still mid-flight (the
1290    /// container not yet registered), so that stop is a no-op. The create
1291    /// task then registers a live container. Reproduce that exact ordering
1292    /// and assert the registered container leaks when nothing reaps it —
1293    /// i.e. the bug the fix closes by re-running `stop_container` in the
1294    /// create task's instance-gone branch.
1295    #[tokio::test]
1296    async fn stop_container_before_registration_is_a_noop_then_registration_leaks() {
1297        let rt = RdsRuntime::new_stub();
1298
1299        // Delete arrives first: container not registered yet -> no-op.
1300        rt.stop_container("db-1").await;
1301        assert!(
1302            rt.containers.read().is_empty(),
1303            "nothing registered yet, stop is a no-op",
1304        );
1305
1306        // ensure_postgres finishes and registers the running container.
1307        rt.containers
1308            .write()
1309            .insert("db-1".to_string(), running_stub("container-abc"));
1310
1311        // Without the fix the create task's instance-gone branch did nothing,
1312        // so the container stays registered (and the real docker container
1313        // keeps holding its host port) forever.
1314        assert_eq!(
1315            rt.containers.read().len(),
1316            1,
1317            "the registered container leaks with no cleanup branch",
1318        );
1319    }
1320
1321    /// 4.3 — the fix: once the create task observes the instance is gone, it
1322    /// calls `stop_container(id)`. By then the container IS registered, so
1323    /// the stop actually reaps it (the runtime uses `cli=true` so the docker
1324    /// rm shells out to a successful no-op binary). The map ends empty: no
1325    /// zombie backing container.
1326    #[tokio::test]
1327    async fn stop_container_after_registration_reaps_orphan_on_delete_during_create() {
1328        let rt = RdsRuntime::new_stub();
1329
1330        // Simulate ensure_postgres having registered the just-started
1331        // container right before the create task checks state.
1332        rt.containers
1333            .write()
1334            .insert("db-1".to_string(), running_stub("container-abc"));
1335
1336        // The instance-gone branch reaps it.
1337        rt.stop_container("db-1").await;
1338
1339        assert!(
1340            rt.containers.read().is_empty(),
1341            "stop_container must reap the registered orphan: {:?}",
1342            rt.containers.read().keys().collect::<Vec<_>>(),
1343        );
1344    }
1345}