1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fakecloud_core::container_net::{detect_container_cli, HostNetworking};
6use parking_lot::RwLock;
7use tokio_postgres::NoTls;
8
9mod k8s;
10
11const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
12const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
13const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
14const AWS_COMMONS_UPGRADE_SQL: &str =
15 include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
16const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
17const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
18const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
19const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
20
21const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
22const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
23const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
24const MYSQL_BOOTSTRAP_SQL: &str =
25 include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
26
27const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
28const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
29const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
30const MARIADB_BOOTSTRAP_SQL: &str =
31 include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
32
33const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
40
41#[derive(Debug, Clone)]
42pub struct RunningDbContainer {
43 pub container_id: String,
45 pub host_port: u16,
47 pub endpoint_address: String,
50 pub endpoint_port: u16,
53}
54
55pub struct RdsRuntime {
56 cli: String,
57 containers: RwLock<HashMap<String, RunningDbContainer>>,
58 instance_id: String,
59 net: HostNetworking,
65 server_port: u16,
66 image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
67 k8s: Option<k8s::K8sDb>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum RuntimeError {
75 #[error("container runtime is unavailable")]
76 Unavailable,
77 #[error("container failed to start: {0}")]
78 ContainerStartFailed(String),
79}
80
81#[derive(Debug, thiserror::Error)]
85pub enum BackendInitError {
86 #[error(transparent)]
87 Env(#[from] fakecloud_k8s::K8sEnvError),
88 #[error(transparent)]
89 PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
90 #[error("failed to connect to the Kubernetes cluster: {0}")]
91 Connect(String),
92}
93
94impl RdsRuntime {
95 pub fn new(server_port: u16) -> Option<Self> {
96 let cli = detect_container_cli()?;
103 let net = HostNetworking::detect(&cli);
104
105 Some(Self {
106 cli,
107 containers: RwLock::new(HashMap::new()),
108 instance_id: format!("fakecloud-{}", std::process::id()),
109 net,
110 server_port,
111 image_cache: RwLock::new(HashMap::new()),
112 k8s: None,
113 })
114 }
115
116 pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
120 let db = k8s::K8sDb::from_env(server_port).await?;
121 Ok(Self {
122 cli: String::new(),
123 containers: RwLock::new(HashMap::new()),
124 instance_id: format!("fakecloud-{}", std::process::id()),
125 net: HostNetworking {
129 host_alias: String::new(),
130 add_host_arg: None,
131 sibling_host: "127.0.0.1".to_string(),
132 },
133 server_port,
134 image_cache: RwLock::new(HashMap::new()),
135 k8s: Some(db),
136 })
137 }
138
139 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 #[cfg(test)]
153 pub(crate) fn new_stub() -> Self {
154 Self {
155 cli: "true".to_string(),
156 containers: RwLock::new(HashMap::new()),
157 instance_id: format!("fakecloud-{}", std::process::id()),
158 net: HostNetworking {
159 host_alias: String::new(),
160 add_host_arg: None,
161 sibling_host: "127.0.0.1".to_string(),
162 },
163 server_port: 0,
164 image_cache: RwLock::new(HashMap::new()),
165 k8s: None,
166 }
167 }
168
169 pub async fn reap_stale(&self) {
172 if let Some(k) = &self.k8s {
173 k.reap_stale().await;
174 }
175 }
176
177 #[allow(clippy::too_many_arguments)]
178 pub async fn ensure_postgres(
179 &self,
180 db_instance_identifier: &str,
181 engine: &str,
182 engine_version: &str,
183 username: &str,
184 password: &str,
185 db_name: &str,
186 account_id: &str,
187 region: &str,
188 tags: &[crate::state::RdsTag],
189 ) -> Result<RunningDbContainer, RuntimeError> {
190 if let Some(k) = &self.k8s {
191 let tag_map: std::collections::BTreeMap<String, String> = tags
195 .iter()
196 .map(|t| (t.key.clone(), t.value.clone()))
197 .collect();
198 let running = k
199 .ensure(
200 db_instance_identifier,
201 engine,
202 engine_version,
203 username,
204 password,
205 db_name,
206 account_id,
207 region,
208 &tag_map,
209 )
210 .await?;
211 self.containers
212 .write()
213 .insert(db_instance_identifier.to_string(), running.clone());
214 return Ok(running);
215 }
216 self.stop_container(db_instance_identifier).await;
217
218 let (image, port, env_vars, bridge_engine_version) = match engine {
226 "postgres" => {
227 let major_version = engine_version.split('.').next().unwrap_or("16");
228 let image = self.ensure_postgres_image(major_version).await?;
229 let env_vars = vec![
230 format!("POSTGRES_USER={username}"),
231 format!("POSTGRES_PASSWORD={password}"),
232 format!("POSTGRES_DB={db_name}"),
233 format!(
234 "FAKECLOUD_ENDPOINT=http://{}:{}",
235 self.net.host_alias, self.server_port
236 ),
237 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
238 format!("FAKECLOUD_REGION={region}"),
239 ];
240 (image, "5432", env_vars, Some(major_version.to_string()))
241 }
242 "mysql" => {
243 let _ = engine_version;
248 let major_version = "8.0";
249 let image = self.ensure_mysql_image(major_version).await?;
250 let env_vars = vec![
251 format!("MYSQL_ROOT_PASSWORD={password}"),
252 format!("MYSQL_USER={username}"),
253 format!("MYSQL_PASSWORD={password}"),
254 format!("MYSQL_DATABASE={db_name}"),
255 format!(
256 "FAKECLOUD_ENDPOINT=http://{}:{}",
257 self.net.host_alias, self.server_port
258 ),
259 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
260 format!("FAKECLOUD_REGION={region}"),
261 ];
262 (image, "3306", env_vars, Some(major_version.to_string()))
263 }
264 "mariadb" => {
265 let major_version = if engine_version.starts_with("10.11") {
266 "10.11"
267 } else if engine_version.starts_with("11.4") {
268 "11.4"
269 } else {
270 "10.6"
271 };
272 let image = self.ensure_mariadb_image(major_version).await?;
273 let env_vars = vec![
274 format!("MARIADB_ROOT_PASSWORD={password}"),
275 format!("MARIADB_USER={username}"),
276 format!("MARIADB_PASSWORD={password}"),
277 format!("MARIADB_DATABASE={db_name}"),
278 format!(
279 "FAKECLOUD_ENDPOINT=http://{}:{}",
280 self.net.host_alias, self.server_port
281 ),
282 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
283 format!("FAKECLOUD_REGION={region}"),
284 ];
285 (image, "3306", env_vars, Some(major_version.to_string()))
286 }
287 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
288 let image = "gvenzl/oracle-free:23-slim".to_string();
293 let env_vars = vec![
294 format!("ORACLE_PASSWORD={password}"),
295 format!("APP_USER={username}"),
296 format!("APP_USER_PASSWORD={password}"),
297 format!("ORACLE_DATABASE={db_name}"),
298 ];
299 (image, "1521", env_vars, None)
300 }
301 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
302 let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
308 let env_vars = vec![
309 "ACCEPT_EULA=Y".to_string(),
310 format!("MSSQL_SA_PASSWORD={password}"),
311 "MSSQL_PID=Express".to_string(),
312 ];
313 (image, "1433", env_vars, None)
314 }
315 "db2-se" | "db2-ae" => {
316 let image = "icr.io/db2_community/db2:latest".to_string();
320 let env_vars = vec![
321 "LICENSE=accept".to_string(),
322 "DB2INSTANCE=db2inst1".to_string(),
323 format!("DB2INST1_PASSWORD={password}"),
324 format!("DBNAME={db_name}"),
325 ];
326 (image, "50000", env_vars, None)
327 }
328 _ => {
329 return Err(RuntimeError::ContainerStartFailed(format!(
330 "Unsupported engine: {}",
331 engine
332 )))
333 }
334 };
335
336 let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
338
339 let mut args = vec![
341 "create".to_string(),
342 "-p".to_string(),
343 format!(":{}", port),
344 "--label".to_string(),
345 format!("fakecloud-rds={db_instance_identifier}"),
346 "--label".to_string(),
347 format!("fakecloud-instance={}", self.instance_id),
348 ];
349
350 if needs_privileged {
351 args.push("--privileged".to_string());
352 }
353
354 if bridge_engine_version.is_some() {
361 self.net.push_add_host_args(&mut args);
362 }
363
364 for env_var in env_vars {
365 args.push("-e".to_string());
366 args.push(env_var);
367 }
368
369 args.push(image);
370
371 let output = tokio::process::Command::new(&self.cli)
372 .args(&args)
373 .output()
374 .await
375 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
376
377 if !output.status.success() {
378 return Err(RuntimeError::ContainerStartFailed(
379 String::from_utf8_lossy(&output.stderr).trim().to_string(),
380 ));
381 }
382
383 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
384 let start_result = tokio::process::Command::new(&self.cli)
385 .args(["start", &container_id])
386 .output()
387 .await
388 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
389
390 if !start_result.status.success() {
391 self.remove_container(&container_id).await;
392 return Err(RuntimeError::ContainerStartFailed(format!(
393 "container start failed: {}",
394 String::from_utf8_lossy(&start_result.stderr).trim()
395 )));
396 }
397
398 let host_port = match self.lookup_port(&container_id, port).await {
399 Ok(host_port) => host_port,
400 Err(error) => {
401 self.remove_container(&container_id).await;
402 return Err(error);
403 }
404 };
405
406 let wait_result = match engine {
408 "postgres" => {
409 self.wait_for_postgres(username, password, db_name, host_port)
410 .await
411 }
412 "mysql" | "mariadb" => {
413 self.wait_for_mysql(username, password, db_name, host_port)
414 .await
415 }
416 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
417 self.wait_for_oracle(&container_id, host_port).await
418 }
419 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
420 self.wait_for_sqlserver(&container_id, host_port).await
421 }
422 "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
423 _ => unreachable!("engine already validated"),
424 };
425
426 if let Err(error) = wait_result {
427 self.remove_container(&container_id).await;
428 return Err(error);
429 }
430
431 let running = RunningDbContainer {
432 container_id,
433 host_port,
434 endpoint_address: self.net.sibling_host.clone(),
438 endpoint_port: host_port,
439 };
440 self.containers
441 .write()
442 .insert(db_instance_identifier.to_string(), running.clone());
443 Ok(running)
444 }
445
446 pub async fn stop_container(&self, db_instance_identifier: &str) {
447 let container = self.containers.write().remove(db_instance_identifier);
448 if let Some(container) = container {
449 if let Some(k) = &self.k8s {
450 k.delete_pod(&container.container_id).await;
451 } else {
452 self.remove_container(&container.container_id).await;
453 }
454 }
455 }
456
457 pub async fn restart_container(
458 &self,
459 db_instance_identifier: &str,
460 engine: &str,
461 username: &str,
462 password: &str,
463 db_name: &str,
464 ) -> Result<RunningDbContainer, RuntimeError> {
465 if let Some(k) = &self.k8s {
466 let running = k
467 .restart(db_instance_identifier, engine, username, password, db_name)
468 .await?;
469 self.containers
470 .write()
471 .insert(db_instance_identifier.to_string(), running.clone());
472 return Ok(running);
473 }
474 let running = self
475 .containers
476 .read()
477 .get(db_instance_identifier)
478 .cloned()
479 .ok_or(RuntimeError::Unavailable)?;
480
481 let output = tokio::process::Command::new(&self.cli)
482 .args(["restart", &running.container_id])
483 .output()
484 .await
485 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
486
487 if !output.status.success() {
488 return Err(RuntimeError::ContainerStartFailed(format!(
489 "container restart failed: {}",
490 String::from_utf8_lossy(&output.stderr).trim()
491 )));
492 }
493
494 let port = match engine {
495 "postgres" => "5432",
496 "mysql" | "mariadb" => "3306",
497 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
498 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
499 "db2-se" | "db2-ae" => "50000",
500 _ => "5432", };
502
503 let host_port = self.lookup_port(&running.container_id, port).await?;
504
505 match engine {
506 "postgres" => {
507 self.wait_for_postgres(username, password, db_name, host_port)
508 .await?
509 }
510 "mysql" | "mariadb" => {
511 self.wait_for_mysql(username, password, db_name, host_port)
512 .await?
513 }
514 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
515 self.wait_for_oracle(&running.container_id, host_port)
516 .await?
517 }
518 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
519 self.wait_for_sqlserver(&running.container_id, host_port)
520 .await?
521 }
522 "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
523 _ => {
524 self.wait_for_postgres(username, password, db_name, host_port)
525 .await?
526 }
527 };
528 let running = RunningDbContainer {
529 container_id: running.container_id,
530 host_port,
531 endpoint_address: self.net.sibling_host.clone(),
532 endpoint_port: host_port,
533 };
534 self.containers
535 .write()
536 .insert(db_instance_identifier.to_string(), running.clone());
537 Ok(running)
538 }
539
540 pub async fn stop_all(&self) {
541 let containers: Vec<String> = {
542 let mut containers = self.containers.write();
543 containers
544 .drain()
545 .map(|(_, container)| container.container_id)
546 .collect()
547 };
548 for container_id in containers {
549 self.remove_container(&container_id).await;
550 }
551 }
552
553 async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
554 let port_output = tokio::process::Command::new(&self.cli)
555 .args(["port", container_id, port])
556 .output()
557 .await
558 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
559
560 let port_str = String::from_utf8_lossy(&port_output.stdout);
561 port_str
562 .trim()
563 .rsplit(':')
564 .next()
565 .and_then(|value| value.parse::<u16>().ok())
566 .ok_or_else(|| {
567 RuntimeError::ContainerStartFailed(format!(
568 "could not determine container port from '{}'",
569 port_str.trim()
570 ))
571 })
572 }
573
574 async fn wait_for_postgres(
575 &self,
576 username: &str,
577 password: &str,
578 db_name: &str,
579 host_port: u16,
580 ) -> Result<(), RuntimeError> {
581 for _ in 0..40 {
582 tokio::time::sleep(Duration::from_millis(500)).await;
583 let connection_string = format!(
584 "host={} port={host_port} user={username} password={password} dbname={db_name}",
585 self.net.sibling_host
586 );
587 if let Ok((client, connection)) =
588 tokio_postgres::connect(&connection_string, NoTls).await
589 {
590 tokio::spawn(async move {
591 let _ = connection.await;
592 });
593 if client.simple_query("SELECT 1").await.is_ok() {
594 return Ok(());
595 }
596 }
597 }
598
599 Err(RuntimeError::ContainerStartFailed(
600 "postgres container did not become ready within 20 seconds".to_string(),
601 ))
602 }
603
604 async fn wait_for_mysql(
605 &self,
606 username: &str,
607 password: &str,
608 db_name: &str,
609 host_port: u16,
610 ) -> Result<(), RuntimeError> {
611 use mysql_async::prelude::*;
612 use mysql_async::OptsBuilder;
613
614 for attempt in 1..=40 {
615 let opts = OptsBuilder::default()
616 .ip_or_hostname(self.net.sibling_host.as_str())
617 .tcp_port(host_port)
618 .user(Some(username))
619 .pass(Some(password))
620 .db_name(Some(db_name));
621
622 match mysql_async::Conn::new(opts).await {
623 Ok(mut conn) => {
624 if conn.query_drop("SELECT 1").await.is_ok() {
625 let _ = conn.disconnect().await;
626 return Ok(());
627 }
628 }
629 Err(_) => {
630 if attempt < 40 {
631 tokio::time::sleep(Duration::from_millis(500)).await;
632 }
633 continue;
634 }
635 }
636 }
637
638 Err(RuntimeError::ContainerStartFailed(
639 "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
640 ))
641 }
642
643 async fn wait_for_oracle(
649 &self,
650 container_id: &str,
651 host_port: u16,
652 ) -> Result<(), RuntimeError> {
653 self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
654 .await?;
655 self.wait_for_tcp(host_port, 30).await
656 }
657
658 async fn wait_for_sqlserver(
662 &self,
663 container_id: &str,
664 host_port: u16,
665 ) -> Result<(), RuntimeError> {
666 self.wait_for_log_marker(
667 container_id,
668 "SQL Server is now ready for client connections",
669 180,
670 )
671 .await?;
672 self.wait_for_tcp(host_port, 30).await
673 }
674
675 async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
679 self.wait_for_log_marker(container_id, "Setup has completed", 360)
680 .await?;
681 self.wait_for_tcp(host_port, 60).await
682 }
683
684 async fn wait_for_log_marker(
687 &self,
688 container_id: &str,
689 marker: &str,
690 deadline_secs: u64,
691 ) -> Result<(), RuntimeError> {
692 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
693 while std::time::Instant::now() < deadline {
694 let output = tokio::process::Command::new(&self.cli)
695 .args(["logs", container_id])
696 .output()
697 .await
698 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
699 let stdout = String::from_utf8_lossy(&output.stdout);
700 let stderr = String::from_utf8_lossy(&output.stderr);
701 if stdout.contains(marker) || stderr.contains(marker) {
702 return Ok(());
703 }
704 tokio::time::sleep(Duration::from_secs(2)).await;
705 }
706 Err(RuntimeError::ContainerStartFailed(format!(
707 "container did not log '{}' within {} seconds",
708 marker, deadline_secs
709 )))
710 }
711
712 async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
716 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
717 let host = self.net.sibling_host.as_str();
718 while std::time::Instant::now() < deadline {
719 if tokio::net::TcpStream::connect((host, host_port))
720 .await
721 .is_ok()
722 {
723 return Ok(());
724 }
725 tokio::time::sleep(Duration::from_millis(500)).await;
726 }
727 Err(RuntimeError::ContainerStartFailed(format!(
728 "TCP probe to {}:{} did not succeed within {}s",
729 host, host_port, deadline_secs
730 )))
731 }
732
733 async fn remove_container(&self, container_id: &str) {
734 let _ = tokio::process::Command::new(&self.cli)
735 .args(["rm", "-f", container_id])
736 .output()
737 .await;
738 }
739
740 pub(crate) async fn ensure_postgres_image(
761 &self,
762 major_version: &str,
763 ) -> Result<String, RuntimeError> {
764 let tag = bridge_image_tag("fakecloud-postgres", major_version);
765 self.ensure_bridge_image(&tag, |tag| async move {
766 self.build_postgres_image_local(major_version, &tag).await
767 })
768 .await
769 }
770
771 async fn docker_image_exists(&self, tag: &str) -> bool {
772 tokio::process::Command::new(&self.cli)
773 .args(["image", "inspect", tag])
774 .stdout(std::process::Stdio::null())
775 .stderr(std::process::Stdio::null())
776 .status()
777 .await
778 .map(|status| status.success())
779 .unwrap_or(false)
780 }
781
782 async fn try_pull_image(&self, tag: &str) -> bool {
783 tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
784 let output = match tokio::process::Command::new(&self.cli)
785 .args(["pull", tag])
786 .output()
787 .await
788 {
789 Ok(output) => output,
790 Err(e) => {
791 tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
792 return false;
793 }
794 };
795 if output.status.success() {
796 return true;
797 }
798 tracing::info!(
799 tag = %tag,
800 stderr = %String::from_utf8_lossy(&output.stderr).trim(),
801 "Prebuilt postgres image not available, falling back to local build"
802 );
803 false
804 }
805
806 async fn build_postgres_image_local(
807 &self,
808 major_version: &str,
809 tag: &str,
810 ) -> Result<(), RuntimeError> {
811 let assets: [(&str, &str); 8] = [
812 ("Dockerfile", POSTGRES_DOCKERFILE),
813 ("aws_commons.control", AWS_COMMONS_CONTROL),
814 ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
815 ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
816 ("aws_lambda.control", AWS_LAMBDA_CONTROL),
817 ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
818 ("aws_s3.control", AWS_S3_CONTROL),
819 ("aws_s3--1.0.sql", AWS_S3_SQL),
820 ];
821 self.build_image_local(
822 tag,
823 &assets,
824 &format!("PG_VERSION={major_version}"),
825 "fakecloud-postgres",
826 )
827 .await
828 }
829
830 pub(crate) async fn ensure_mysql_image(
835 &self,
836 major_version: &str,
837 ) -> Result<String, RuntimeError> {
838 let tag = bridge_image_tag("fakecloud-mysql", major_version);
839 self.ensure_bridge_image(&tag, |tag| async move {
840 self.build_mysql_image_local(major_version, &tag).await
841 })
842 .await
843 }
844
845 pub(crate) async fn ensure_mariadb_image(
846 &self,
847 major_version: &str,
848 ) -> Result<String, RuntimeError> {
849 let tag = bridge_image_tag("fakecloud-mariadb", major_version);
850 self.ensure_bridge_image(&tag, |tag| async move {
851 self.build_mariadb_image_local(major_version, &tag).await
852 })
853 .await
854 }
855
856 async fn ensure_bridge_image<F, Fut>(
861 &self,
862 tag: &str,
863 build_local: F,
864 ) -> Result<String, RuntimeError>
865 where
866 F: FnOnce(String) -> Fut,
867 Fut: std::future::Future<Output = Result<(), RuntimeError>>,
868 {
869 let lock = {
870 let mut cache = self.image_cache.write();
871 cache
872 .entry(tag.to_string())
873 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
874 .clone()
875 };
876 let mut resolved = lock.lock().await;
877 if *resolved {
878 return Ok(tag.to_string());
879 }
880
881 let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
882 .map(|v| !v.is_empty())
883 .unwrap_or(false);
884
885 if !force_rebuild {
886 if self.docker_image_exists(tag).await {
887 *resolved = true;
888 return Ok(tag.to_string());
889 }
890 if self.try_pull_image(tag).await {
891 *resolved = true;
892 return Ok(tag.to_string());
893 }
894 }
895
896 build_local(tag.to_string()).await?;
897 *resolved = true;
898 Ok(tag.to_string())
899 }
900
901 async fn build_mysql_image_local(
902 &self,
903 major_version: &str,
904 tag: &str,
905 ) -> Result<(), RuntimeError> {
906 let assets: [(&str, &str); 4] = [
907 ("Dockerfile", MYSQL_DOCKERFILE),
908 ("fakecloud_udf.c", MYSQL_UDF_C),
909 ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
910 ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
911 ];
912 self.build_image_local(
913 tag,
914 &assets,
915 &format!("MYSQL_VERSION={major_version}"),
916 "fakecloud-mysql",
917 )
918 .await
919 }
920
921 async fn build_mariadb_image_local(
922 &self,
923 major_version: &str,
924 tag: &str,
925 ) -> Result<(), RuntimeError> {
926 let assets: [(&str, &str); 4] = [
927 ("Dockerfile", MARIADB_DOCKERFILE),
928 ("fakecloud_udf.c", MARIADB_UDF_C),
929 ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
930 ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
931 ];
932 self.build_image_local(
933 tag,
934 &assets,
935 &format!("MARIADB_VERSION={major_version}"),
936 "fakecloud-mariadb",
937 )
938 .await
939 }
940
941 async fn build_image_local(
942 &self,
943 tag: &str,
944 assets: &[(&str, &str)],
945 build_arg: &str,
946 image_label: &str,
947 ) -> Result<(), RuntimeError> {
948 let build_dir =
949 tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
950 for (name, contents) in assets {
951 tokio::fs::write(build_dir.path().join(name), contents)
952 .await
953 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
954 }
955
956 tracing::info!(
957 tag = %tag,
958 image = %image_label,
959 "Building {image_label} image locally (first use can take ~60s)"
960 );
961
962 let output = tokio::process::Command::new(&self.cli)
963 .args(["build", "--build-arg", build_arg, "-t", tag, "."])
964 .current_dir(build_dir.path())
965 .output()
966 .await
967 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
968
969 if !output.status.success() {
970 return Err(RuntimeError::ContainerStartFailed(format!(
971 "docker build for {} failed: {}",
972 tag,
973 String::from_utf8_lossy(&output.stderr).trim()
974 )));
975 }
976
977 Ok(())
978 }
979
980 pub async fn dump_database(
981 &self,
982 db_instance_identifier: &str,
983 engine: &str,
984 username: &str,
985 password: &str,
986 db_name: &str,
987 ) -> Result<Vec<u8>, RuntimeError> {
988 let container = self
989 .containers
990 .read()
991 .get(db_instance_identifier)
992 .cloned()
993 .ok_or(RuntimeError::Unavailable)?;
994
995 if let Some(k) = &self.k8s {
996 return k
997 .dump_database(&container.container_id, engine, username, password, db_name)
998 .await;
999 }
1000
1001 let args: Vec<String> = match engine {
1002 "mysql" | "mariadb" => vec![
1003 "exec".into(),
1004 container.container_id.clone(),
1005 "mysqldump".into(),
1006 "-u".into(),
1007 username.into(),
1008 format!("-p{password}"),
1009 db_name.into(),
1010 ],
1011 "postgres" => vec![
1012 "exec".into(),
1013 container.container_id.clone(),
1014 "pg_dump".into(),
1015 "-U".into(),
1016 username.into(),
1017 "-d".into(),
1018 db_name.into(),
1019 "--no-password".into(),
1020 ],
1021 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1028 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1029 return Err(RuntimeError::ContainerStartFailed(format!(
1030 "engine {engine} is not yet supported by the snapshot/read-replica path; \
1031 emulator stores the API state but cannot dump the underlying database"
1032 )));
1033 }
1034 other => {
1035 return Err(RuntimeError::ContainerStartFailed(format!(
1036 "engine {other} is not supported by dump_database"
1037 )));
1038 }
1039 };
1040
1041 let output = tokio::process::Command::new(&self.cli)
1042 .args(&args)
1043 .output()
1044 .await
1045 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1046
1047 if !output.status.success() {
1048 return Err(RuntimeError::ContainerStartFailed(format!(
1049 "dump failed: {}",
1050 String::from_utf8_lossy(&output.stderr).trim()
1051 )));
1052 }
1053
1054 Ok(output.stdout)
1055 }
1056
1057 pub async fn read_log_file(
1063 &self,
1064 db_instance_identifier: &str,
1065 container_path: &str,
1066 ) -> Result<Vec<u8>, RuntimeError> {
1067 let container = self
1068 .containers
1069 .read()
1070 .get(db_instance_identifier)
1071 .cloned()
1072 .ok_or(RuntimeError::Unavailable)?;
1073
1074 if let Some(k) = &self.k8s {
1075 return k.read_file(&container.container_id, container_path).await;
1076 }
1077
1078 let output = tokio::process::Command::new(&self.cli)
1079 .args(["exec", &container.container_id, "cat", container_path])
1080 .output()
1081 .await
1082 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1083
1084 if !output.status.success() {
1085 return Err(RuntimeError::ContainerStartFailed(format!(
1086 "cat {container_path} failed: {}",
1087 String::from_utf8_lossy(&output.stderr).trim()
1088 )));
1089 }
1090
1091 Ok(output.stdout)
1092 }
1093
1094 pub async fn restore_database(
1095 &self,
1096 db_instance_identifier: &str,
1097 engine: &str,
1098 username: &str,
1099 password: &str,
1100 db_name: &str,
1101 dump_data: &[u8],
1102 ) -> Result<(), RuntimeError> {
1103 let container = self
1104 .containers
1105 .read()
1106 .get(db_instance_identifier)
1107 .cloned()
1108 .ok_or(RuntimeError::Unavailable)?;
1109
1110 if let Some(k) = &self.k8s {
1111 return k
1112 .restore_database(
1113 &container.container_id,
1114 engine,
1115 username,
1116 password,
1117 db_name,
1118 dump_data,
1119 )
1120 .await;
1121 }
1122
1123 let args: Vec<String> = match engine {
1124 "mysql" | "mariadb" => vec![
1125 "exec".into(),
1126 "-i".into(),
1127 container.container_id.clone(),
1128 "mysql".into(),
1129 "-u".into(),
1130 username.into(),
1131 format!("-p{password}"),
1132 db_name.into(),
1133 ],
1134 "postgres" => vec![
1135 "exec".into(),
1136 "-i".into(),
1137 container.container_id.clone(),
1138 "psql".into(),
1139 "-U".into(),
1140 username.into(),
1141 "-d".into(),
1142 db_name.into(),
1143 "--no-password".into(),
1144 "-v".into(),
1145 "ON_ERROR_STOP=1".into(),
1146 ],
1147 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1148 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1149 return Err(RuntimeError::ContainerStartFailed(format!(
1150 "engine {engine} is not yet supported by the snapshot-restore path"
1151 )));
1152 }
1153 other => {
1154 return Err(RuntimeError::ContainerStartFailed(format!(
1155 "engine {other} is not supported by restore_database"
1156 )));
1157 }
1158 };
1159
1160 let mut child = tokio::process::Command::new(&self.cli)
1161 .args(&args)
1162 .stdin(std::process::Stdio::piped())
1163 .stdout(std::process::Stdio::piped())
1164 .stderr(std::process::Stdio::piped())
1165 .spawn()
1166 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1167
1168 if let Some(mut stdin) = child.stdin.take() {
1169 use tokio::io::AsyncWriteExt;
1170 stdin
1171 .write_all(dump_data)
1172 .await
1173 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1174 drop(stdin);
1175 }
1176
1177 let output = child
1178 .wait_with_output()
1179 .await
1180 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1181
1182 if !output.status.success() {
1183 return Err(RuntimeError::ContainerStartFailed(format!(
1184 "restore failed: {}",
1185 String::from_utf8_lossy(&output.stderr).trim()
1186 )));
1187 }
1188
1189 Ok(())
1190 }
1191}
1192
1193pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1202 let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1203 .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1204 bridge_image_tag_with_registry(®istry, image, major_version)
1205}
1206
1207fn bridge_image_tag_with_registry(registry: &str, image: &str, major_version: &str) -> String {
1213 let registry = registry.trim_end_matches('/');
1214 format!(
1215 "{}/{}:{}-{}",
1216 registry,
1217 image,
1218 major_version,
1219 env!("CARGO_PKG_VERSION")
1220 )
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225 use super::*;
1226
1227 #[test]
1232 fn bridge_image_tag_resolves_registry_overrides() {
1233 assert_eq!(
1235 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-postgres", "16"),
1236 format!(
1237 "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1238 env!("CARGO_PKG_VERSION")
1239 )
1240 );
1241 assert_eq!(
1242 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mysql", "8.0"),
1243 format!(
1244 "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1245 env!("CARGO_PKG_VERSION")
1246 )
1247 );
1248 assert_eq!(
1249 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mariadb", "10.11"),
1250 format!(
1251 "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1252 env!("CARGO_PKG_VERSION")
1253 )
1254 );
1255
1256 assert_eq!(
1258 bridge_image_tag_with_registry("registry.example.com/team", "fakecloud-postgres", "15"),
1259 format!(
1260 "registry.example.com/team/fakecloud-postgres:15-{}",
1261 env!("CARGO_PKG_VERSION")
1262 )
1263 );
1264
1265 assert_eq!(
1267 bridge_image_tag_with_registry(
1268 "registry.example.com/team/",
1269 "fakecloud-postgres",
1270 "13"
1271 ),
1272 format!(
1273 "registry.example.com/team/fakecloud-postgres:13-{}",
1274 env!("CARGO_PKG_VERSION")
1275 )
1276 );
1277 }
1278
1279 fn running_stub(container_id: &str) -> RunningDbContainer {
1280 RunningDbContainer {
1281 container_id: container_id.to_string(),
1282 host_port: 54321,
1283 endpoint_address: "127.0.0.1".to_string(),
1284 endpoint_port: 54321,
1285 }
1286 }
1287
1288 #[tokio::test]
1296 async fn stop_container_before_registration_is_a_noop_then_registration_leaks() {
1297 let rt = RdsRuntime::new_stub();
1298
1299 rt.stop_container("db-1").await;
1301 assert!(
1302 rt.containers.read().is_empty(),
1303 "nothing registered yet, stop is a no-op",
1304 );
1305
1306 rt.containers
1308 .write()
1309 .insert("db-1".to_string(), running_stub("container-abc"));
1310
1311 assert_eq!(
1315 rt.containers.read().len(),
1316 1,
1317 "the registered container leaks with no cleanup branch",
1318 );
1319 }
1320
1321 #[tokio::test]
1327 async fn stop_container_after_registration_reaps_orphan_on_delete_during_create() {
1328 let rt = RdsRuntime::new_stub();
1329
1330 rt.containers
1333 .write()
1334 .insert("db-1".to_string(), running_stub("container-abc"));
1335
1336 rt.stop_container("db-1").await;
1338
1339 assert!(
1340 rt.containers.read().is_empty(),
1341 "stop_container must reap the registered orphan: {:?}",
1342 rt.containers.read().keys().collect::<Vec<_>>(),
1343 );
1344 }
1345}