1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use parking_lot::RwLock;
6use tokio_postgres::NoTls;
7
8mod k8s;
9
10const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
11const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
12const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
13const AWS_COMMONS_UPGRADE_SQL: &str =
14 include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
15const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
16const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
17const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
18const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
19
20const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
21const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
22const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
23const MYSQL_BOOTSTRAP_SQL: &str =
24 include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
25
26const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
27const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
28const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
29const MARIADB_BOOTSTRAP_SQL: &str =
30 include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
31
32const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
39
40#[derive(Debug, Clone)]
41pub struct RunningDbContainer {
42 pub container_id: String,
44 pub host_port: u16,
46 pub endpoint_address: String,
49 pub endpoint_port: u16,
52}
53
54pub struct RdsRuntime {
55 cli: String,
56 containers: RwLock<HashMap<String, RunningDbContainer>>,
57 instance_id: String,
58 host_ip: String,
59 server_port: u16,
60 image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
61 k8s: Option<k8s::K8sDb>,
65}
66
67#[derive(Debug, thiserror::Error)]
68pub enum RuntimeError {
69 #[error("container runtime is unavailable")]
70 Unavailable,
71 #[error("container failed to start: {0}")]
72 ContainerStartFailed(String),
73}
74
75#[derive(Debug, thiserror::Error)]
79pub enum BackendInitError {
80 #[error(transparent)]
81 Env(#[from] fakecloud_k8s::K8sEnvError),
82 #[error("failed to connect to the Kubernetes cluster: {0}")]
83 Connect(String),
84}
85
86impl RdsRuntime {
87 pub fn new(server_port: u16) -> Option<Self> {
88 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
89 if cli_available(&cli) {
90 cli
91 } else {
92 return None;
93 }
94 } else if cli_available("docker") {
95 "docker".to_string()
96 } else if cli_available("podman") {
97 "podman".to_string()
98 } else {
99 return None;
100 };
101
102 let host_ip = if cfg!(target_os = "linux") {
106 detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
107 } else {
108 "host-gateway".to_string()
109 };
110
111 Some(Self {
112 cli,
113 containers: RwLock::new(HashMap::new()),
114 instance_id: format!("fakecloud-{}", std::process::id()),
115 host_ip,
116 server_port,
117 image_cache: RwLock::new(HashMap::new()),
118 k8s: None,
119 })
120 }
121
122 pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
126 let db = k8s::K8sDb::from_env(server_port).await?;
127 Ok(Self {
128 cli: String::new(),
129 containers: RwLock::new(HashMap::new()),
130 instance_id: format!("fakecloud-{}", std::process::id()),
131 host_ip: String::new(),
132 server_port,
133 image_cache: RwLock::new(HashMap::new()),
134 k8s: Some(db),
135 })
136 }
137
138 pub fn cli_name(&self) -> &str {
139 if self.k8s.is_some() {
140 "kubernetes"
141 } else {
142 &self.cli
143 }
144 }
145
146 pub async fn reap_stale(&self) {
149 if let Some(k) = &self.k8s {
150 k.reap_stale().await;
151 }
152 }
153
154 #[allow(clippy::too_many_arguments)]
155 pub async fn ensure_postgres(
156 &self,
157 db_instance_identifier: &str,
158 engine: &str,
159 engine_version: &str,
160 username: &str,
161 password: &str,
162 db_name: &str,
163 account_id: &str,
164 region: &str,
165 ) -> Result<RunningDbContainer, RuntimeError> {
166 if let Some(k) = &self.k8s {
167 let running = k
168 .ensure(
169 db_instance_identifier,
170 engine,
171 engine_version,
172 username,
173 password,
174 db_name,
175 account_id,
176 region,
177 )
178 .await?;
179 self.containers
180 .write()
181 .insert(db_instance_identifier.to_string(), running.clone());
182 return Ok(running);
183 }
184 self.stop_container(db_instance_identifier).await;
185
186 let (image, port, env_vars, bridge_engine_version) = match engine {
194 "postgres" => {
195 let major_version = engine_version.split('.').next().unwrap_or("16");
196 let image = self.ensure_postgres_image(major_version).await?;
197 let env_vars = vec![
198 format!("POSTGRES_USER={username}"),
199 format!("POSTGRES_PASSWORD={password}"),
200 format!("POSTGRES_DB={db_name}"),
201 format!(
202 "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
203 self.server_port
204 ),
205 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
206 format!("FAKECLOUD_REGION={region}"),
207 ];
208 (image, "5432", env_vars, Some(major_version.to_string()))
209 }
210 "mysql" => {
211 let _ = engine_version;
216 let major_version = "8.0";
217 let image = self.ensure_mysql_image(major_version).await?;
218 let env_vars = vec![
219 format!("MYSQL_ROOT_PASSWORD={password}"),
220 format!("MYSQL_USER={username}"),
221 format!("MYSQL_PASSWORD={password}"),
222 format!("MYSQL_DATABASE={db_name}"),
223 format!(
224 "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
225 self.server_port
226 ),
227 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
228 format!("FAKECLOUD_REGION={region}"),
229 ];
230 (image, "3306", env_vars, Some(major_version.to_string()))
231 }
232 "mariadb" => {
233 let major_version = if engine_version.starts_with("10.11") {
234 "10.11"
235 } else if engine_version.starts_with("11.4") {
236 "11.4"
237 } else {
238 "10.6"
239 };
240 let image = self.ensure_mariadb_image(major_version).await?;
241 let env_vars = vec![
242 format!("MARIADB_ROOT_PASSWORD={password}"),
243 format!("MARIADB_USER={username}"),
244 format!("MARIADB_PASSWORD={password}"),
245 format!("MARIADB_DATABASE={db_name}"),
246 format!(
247 "FAKECLOUD_ENDPOINT=http://host.docker.internal:{}",
248 self.server_port
249 ),
250 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
251 format!("FAKECLOUD_REGION={region}"),
252 ];
253 (image, "3306", env_vars, Some(major_version.to_string()))
254 }
255 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
256 let image = "gvenzl/oracle-free:23-slim".to_string();
261 let env_vars = vec![
262 format!("ORACLE_PASSWORD={password}"),
263 format!("APP_USER={username}"),
264 format!("APP_USER_PASSWORD={password}"),
265 format!("ORACLE_DATABASE={db_name}"),
266 ];
267 (image, "1521", env_vars, None)
268 }
269 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
270 let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
276 let env_vars = vec![
277 "ACCEPT_EULA=Y".to_string(),
278 format!("MSSQL_SA_PASSWORD={password}"),
279 "MSSQL_PID=Express".to_string(),
280 ];
281 (image, "1433", env_vars, None)
282 }
283 "db2-se" | "db2-ae" => {
284 let image = "icr.io/db2_community/db2:latest".to_string();
288 let env_vars = vec![
289 "LICENSE=accept".to_string(),
290 "DB2INSTANCE=db2inst1".to_string(),
291 format!("DB2INST1_PASSWORD={password}"),
292 format!("DBNAME={db_name}"),
293 ];
294 (image, "50000", env_vars, None)
295 }
296 _ => {
297 return Err(RuntimeError::ContainerStartFailed(format!(
298 "Unsupported engine: {}",
299 engine
300 )))
301 }
302 };
303
304 let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
306
307 let mut args = vec![
309 "create".to_string(),
310 "-p".to_string(),
311 format!(":{}", port),
312 "--label".to_string(),
313 format!("fakecloud-rds={db_instance_identifier}"),
314 "--label".to_string(),
315 format!("fakecloud-instance={}", self.instance_id),
316 ];
317
318 if needs_privileged {
319 args.push("--privileged".to_string());
320 }
321
322 if bridge_engine_version.is_some() {
327 args.push("--add-host".to_string());
328 args.push(format!("host.docker.internal:{}", self.host_ip));
329 }
330
331 for env_var in env_vars {
332 args.push("-e".to_string());
333 args.push(env_var);
334 }
335
336 args.push(image);
337
338 let output = tokio::process::Command::new(&self.cli)
339 .args(&args)
340 .output()
341 .await
342 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
343
344 if !output.status.success() {
345 return Err(RuntimeError::ContainerStartFailed(
346 String::from_utf8_lossy(&output.stderr).trim().to_string(),
347 ));
348 }
349
350 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
351 let start_result = tokio::process::Command::new(&self.cli)
352 .args(["start", &container_id])
353 .output()
354 .await
355 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
356
357 if !start_result.status.success() {
358 self.remove_container(&container_id).await;
359 return Err(RuntimeError::ContainerStartFailed(format!(
360 "container start failed: {}",
361 String::from_utf8_lossy(&start_result.stderr).trim()
362 )));
363 }
364
365 let host_port = match self.lookup_port(&container_id, port).await {
366 Ok(host_port) => host_port,
367 Err(error) => {
368 self.remove_container(&container_id).await;
369 return Err(error);
370 }
371 };
372
373 let wait_result = match engine {
375 "postgres" => {
376 self.wait_for_postgres(username, password, db_name, host_port)
377 .await
378 }
379 "mysql" | "mariadb" => {
380 self.wait_for_mysql(username, password, db_name, host_port)
381 .await
382 }
383 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
384 self.wait_for_oracle(&container_id, host_port).await
385 }
386 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
387 self.wait_for_sqlserver(&container_id, host_port).await
388 }
389 "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
390 _ => unreachable!("engine already validated"),
391 };
392
393 if let Err(error) = wait_result {
394 self.remove_container(&container_id).await;
395 return Err(error);
396 }
397
398 let running = RunningDbContainer {
399 container_id,
400 host_port,
401 endpoint_address: "127.0.0.1".to_string(),
402 endpoint_port: host_port,
403 };
404 self.containers
405 .write()
406 .insert(db_instance_identifier.to_string(), running.clone());
407 Ok(running)
408 }
409
410 pub async fn stop_container(&self, db_instance_identifier: &str) {
411 let container = self.containers.write().remove(db_instance_identifier);
412 if let Some(container) = container {
413 if let Some(k) = &self.k8s {
414 k.delete_pod(&container.container_id).await;
415 } else {
416 self.remove_container(&container.container_id).await;
417 }
418 }
419 }
420
421 pub async fn restart_container(
422 &self,
423 db_instance_identifier: &str,
424 engine: &str,
425 username: &str,
426 password: &str,
427 db_name: &str,
428 ) -> Result<RunningDbContainer, RuntimeError> {
429 if let Some(k) = &self.k8s {
430 let running = k
431 .restart(db_instance_identifier, engine, username, password, db_name)
432 .await?;
433 self.containers
434 .write()
435 .insert(db_instance_identifier.to_string(), running.clone());
436 return Ok(running);
437 }
438 let running = self
439 .containers
440 .read()
441 .get(db_instance_identifier)
442 .cloned()
443 .ok_or(RuntimeError::Unavailable)?;
444
445 let output = tokio::process::Command::new(&self.cli)
446 .args(["restart", &running.container_id])
447 .output()
448 .await
449 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
450
451 if !output.status.success() {
452 return Err(RuntimeError::ContainerStartFailed(format!(
453 "container restart failed: {}",
454 String::from_utf8_lossy(&output.stderr).trim()
455 )));
456 }
457
458 let port = match engine {
459 "postgres" => "5432",
460 "mysql" | "mariadb" => "3306",
461 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
462 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
463 "db2-se" | "db2-ae" => "50000",
464 _ => "5432", };
466
467 let host_port = self.lookup_port(&running.container_id, port).await?;
468
469 match engine {
470 "postgres" => {
471 self.wait_for_postgres(username, password, db_name, host_port)
472 .await?
473 }
474 "mysql" | "mariadb" => {
475 self.wait_for_mysql(username, password, db_name, host_port)
476 .await?
477 }
478 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
479 self.wait_for_oracle(&running.container_id, host_port)
480 .await?
481 }
482 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
483 self.wait_for_sqlserver(&running.container_id, host_port)
484 .await?
485 }
486 "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
487 _ => {
488 self.wait_for_postgres(username, password, db_name, host_port)
489 .await?
490 }
491 };
492 let running = RunningDbContainer {
493 container_id: running.container_id,
494 host_port,
495 endpoint_address: "127.0.0.1".to_string(),
496 endpoint_port: host_port,
497 };
498 self.containers
499 .write()
500 .insert(db_instance_identifier.to_string(), running.clone());
501 Ok(running)
502 }
503
504 pub async fn stop_all(&self) {
505 let containers: Vec<String> = {
506 let mut containers = self.containers.write();
507 containers
508 .drain()
509 .map(|(_, container)| container.container_id)
510 .collect()
511 };
512 for container_id in containers {
513 self.remove_container(&container_id).await;
514 }
515 }
516
517 async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
518 let port_output = tokio::process::Command::new(&self.cli)
519 .args(["port", container_id, port])
520 .output()
521 .await
522 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
523
524 let port_str = String::from_utf8_lossy(&port_output.stdout);
525 port_str
526 .trim()
527 .rsplit(':')
528 .next()
529 .and_then(|value| value.parse::<u16>().ok())
530 .ok_or_else(|| {
531 RuntimeError::ContainerStartFailed(format!(
532 "could not determine container port from '{}'",
533 port_str.trim()
534 ))
535 })
536 }
537
538 async fn wait_for_postgres(
539 &self,
540 username: &str,
541 password: &str,
542 db_name: &str,
543 host_port: u16,
544 ) -> Result<(), RuntimeError> {
545 for _ in 0..40 {
546 tokio::time::sleep(Duration::from_millis(500)).await;
547 let connection_string = format!(
548 "host=127.0.0.1 port={host_port} user={username} password={password} dbname={db_name}"
549 );
550 if let Ok((client, connection)) =
551 tokio_postgres::connect(&connection_string, NoTls).await
552 {
553 tokio::spawn(async move {
554 let _ = connection.await;
555 });
556 if client.simple_query("SELECT 1").await.is_ok() {
557 return Ok(());
558 }
559 }
560 }
561
562 Err(RuntimeError::ContainerStartFailed(
563 "postgres container did not become ready within 20 seconds".to_string(),
564 ))
565 }
566
567 async fn wait_for_mysql(
568 &self,
569 username: &str,
570 password: &str,
571 db_name: &str,
572 host_port: u16,
573 ) -> Result<(), RuntimeError> {
574 use mysql_async::prelude::*;
575 use mysql_async::OptsBuilder;
576
577 for attempt in 1..=40 {
578 let opts = OptsBuilder::default()
579 .ip_or_hostname("127.0.0.1")
580 .tcp_port(host_port)
581 .user(Some(username))
582 .pass(Some(password))
583 .db_name(Some(db_name));
584
585 match mysql_async::Conn::new(opts).await {
586 Ok(mut conn) => {
587 if conn.query_drop("SELECT 1").await.is_ok() {
588 let _ = conn.disconnect().await;
589 return Ok(());
590 }
591 }
592 Err(_) => {
593 if attempt < 40 {
594 tokio::time::sleep(Duration::from_millis(500)).await;
595 }
596 continue;
597 }
598 }
599 }
600
601 Err(RuntimeError::ContainerStartFailed(
602 "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
603 ))
604 }
605
606 async fn wait_for_oracle(
612 &self,
613 container_id: &str,
614 host_port: u16,
615 ) -> Result<(), RuntimeError> {
616 self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
617 .await?;
618 self.wait_for_tcp(host_port, 30).await
619 }
620
621 async fn wait_for_sqlserver(
625 &self,
626 container_id: &str,
627 host_port: u16,
628 ) -> Result<(), RuntimeError> {
629 self.wait_for_log_marker(
630 container_id,
631 "SQL Server is now ready for client connections",
632 180,
633 )
634 .await?;
635 self.wait_for_tcp(host_port, 30).await
636 }
637
638 async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
642 self.wait_for_log_marker(container_id, "Setup has completed", 360)
643 .await?;
644 self.wait_for_tcp(host_port, 60).await
645 }
646
647 async fn wait_for_log_marker(
650 &self,
651 container_id: &str,
652 marker: &str,
653 deadline_secs: u64,
654 ) -> Result<(), RuntimeError> {
655 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
656 while std::time::Instant::now() < deadline {
657 let output = tokio::process::Command::new(&self.cli)
658 .args(["logs", container_id])
659 .output()
660 .await
661 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
662 let stdout = String::from_utf8_lossy(&output.stdout);
663 let stderr = String::from_utf8_lossy(&output.stderr);
664 if stdout.contains(marker) || stderr.contains(marker) {
665 return Ok(());
666 }
667 tokio::time::sleep(Duration::from_secs(2)).await;
668 }
669 Err(RuntimeError::ContainerStartFailed(format!(
670 "container did not log '{}' within {} seconds",
671 marker, deadline_secs
672 )))
673 }
674
675 async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
679 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
680 while std::time::Instant::now() < deadline {
681 if tokio::net::TcpStream::connect(("127.0.0.1", host_port))
682 .await
683 .is_ok()
684 {
685 return Ok(());
686 }
687 tokio::time::sleep(Duration::from_millis(500)).await;
688 }
689 Err(RuntimeError::ContainerStartFailed(format!(
690 "TCP probe to 127.0.0.1:{} did not succeed within {}s",
691 host_port, deadline_secs
692 )))
693 }
694
695 async fn remove_container(&self, container_id: &str) {
696 let _ = tokio::process::Command::new(&self.cli)
697 .args(["rm", "-f", container_id])
698 .output()
699 .await;
700 }
701
702 pub(crate) async fn ensure_postgres_image(
723 &self,
724 major_version: &str,
725 ) -> Result<String, RuntimeError> {
726 let tag = bridge_image_tag("fakecloud-postgres", major_version);
727 self.ensure_bridge_image(&tag, |tag| async move {
728 self.build_postgres_image_local(major_version, &tag).await
729 })
730 .await
731 }
732
733 async fn docker_image_exists(&self, tag: &str) -> bool {
734 tokio::process::Command::new(&self.cli)
735 .args(["image", "inspect", tag])
736 .stdout(std::process::Stdio::null())
737 .stderr(std::process::Stdio::null())
738 .status()
739 .await
740 .map(|status| status.success())
741 .unwrap_or(false)
742 }
743
744 async fn try_pull_image(&self, tag: &str) -> bool {
745 tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
746 let output = match tokio::process::Command::new(&self.cli)
747 .args(["pull", tag])
748 .output()
749 .await
750 {
751 Ok(output) => output,
752 Err(e) => {
753 tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
754 return false;
755 }
756 };
757 if output.status.success() {
758 return true;
759 }
760 tracing::info!(
761 tag = %tag,
762 stderr = %String::from_utf8_lossy(&output.stderr).trim(),
763 "Prebuilt postgres image not available, falling back to local build"
764 );
765 false
766 }
767
768 async fn build_postgres_image_local(
769 &self,
770 major_version: &str,
771 tag: &str,
772 ) -> Result<(), RuntimeError> {
773 let assets: [(&str, &str); 8] = [
774 ("Dockerfile", POSTGRES_DOCKERFILE),
775 ("aws_commons.control", AWS_COMMONS_CONTROL),
776 ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
777 ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
778 ("aws_lambda.control", AWS_LAMBDA_CONTROL),
779 ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
780 ("aws_s3.control", AWS_S3_CONTROL),
781 ("aws_s3--1.0.sql", AWS_S3_SQL),
782 ];
783 self.build_image_local(
784 tag,
785 &assets,
786 &format!("PG_VERSION={major_version}"),
787 "fakecloud-postgres",
788 )
789 .await
790 }
791
792 pub(crate) async fn ensure_mysql_image(
797 &self,
798 major_version: &str,
799 ) -> Result<String, RuntimeError> {
800 let tag = bridge_image_tag("fakecloud-mysql", major_version);
801 self.ensure_bridge_image(&tag, |tag| async move {
802 self.build_mysql_image_local(major_version, &tag).await
803 })
804 .await
805 }
806
807 pub(crate) async fn ensure_mariadb_image(
808 &self,
809 major_version: &str,
810 ) -> Result<String, RuntimeError> {
811 let tag = bridge_image_tag("fakecloud-mariadb", major_version);
812 self.ensure_bridge_image(&tag, |tag| async move {
813 self.build_mariadb_image_local(major_version, &tag).await
814 })
815 .await
816 }
817
818 async fn ensure_bridge_image<F, Fut>(
823 &self,
824 tag: &str,
825 build_local: F,
826 ) -> Result<String, RuntimeError>
827 where
828 F: FnOnce(String) -> Fut,
829 Fut: std::future::Future<Output = Result<(), RuntimeError>>,
830 {
831 let lock = {
832 let mut cache = self.image_cache.write();
833 cache
834 .entry(tag.to_string())
835 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
836 .clone()
837 };
838 let mut resolved = lock.lock().await;
839 if *resolved {
840 return Ok(tag.to_string());
841 }
842
843 let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
844 .map(|v| !v.is_empty())
845 .unwrap_or(false);
846
847 if !force_rebuild {
848 if self.docker_image_exists(tag).await {
849 *resolved = true;
850 return Ok(tag.to_string());
851 }
852 if self.try_pull_image(tag).await {
853 *resolved = true;
854 return Ok(tag.to_string());
855 }
856 }
857
858 build_local(tag.to_string()).await?;
859 *resolved = true;
860 Ok(tag.to_string())
861 }
862
863 async fn build_mysql_image_local(
864 &self,
865 major_version: &str,
866 tag: &str,
867 ) -> Result<(), RuntimeError> {
868 let assets: [(&str, &str); 4] = [
869 ("Dockerfile", MYSQL_DOCKERFILE),
870 ("fakecloud_udf.c", MYSQL_UDF_C),
871 ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
872 ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
873 ];
874 self.build_image_local(
875 tag,
876 &assets,
877 &format!("MYSQL_VERSION={major_version}"),
878 "fakecloud-mysql",
879 )
880 .await
881 }
882
883 async fn build_mariadb_image_local(
884 &self,
885 major_version: &str,
886 tag: &str,
887 ) -> Result<(), RuntimeError> {
888 let assets: [(&str, &str); 4] = [
889 ("Dockerfile", MARIADB_DOCKERFILE),
890 ("fakecloud_udf.c", MARIADB_UDF_C),
891 ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
892 ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
893 ];
894 self.build_image_local(
895 tag,
896 &assets,
897 &format!("MARIADB_VERSION={major_version}"),
898 "fakecloud-mariadb",
899 )
900 .await
901 }
902
903 async fn build_image_local(
904 &self,
905 tag: &str,
906 assets: &[(&str, &str)],
907 build_arg: &str,
908 image_label: &str,
909 ) -> Result<(), RuntimeError> {
910 let build_dir =
911 tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
912 for (name, contents) in assets {
913 tokio::fs::write(build_dir.path().join(name), contents)
914 .await
915 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
916 }
917
918 tracing::info!(
919 tag = %tag,
920 image = %image_label,
921 "Building {image_label} image locally (first use can take ~60s)"
922 );
923
924 let output = tokio::process::Command::new(&self.cli)
925 .args(["build", "--build-arg", build_arg, "-t", tag, "."])
926 .current_dir(build_dir.path())
927 .output()
928 .await
929 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
930
931 if !output.status.success() {
932 return Err(RuntimeError::ContainerStartFailed(format!(
933 "docker build for {} failed: {}",
934 tag,
935 String::from_utf8_lossy(&output.stderr).trim()
936 )));
937 }
938
939 Ok(())
940 }
941
942 pub async fn dump_database(
943 &self,
944 db_instance_identifier: &str,
945 engine: &str,
946 username: &str,
947 password: &str,
948 db_name: &str,
949 ) -> Result<Vec<u8>, RuntimeError> {
950 let container = self
951 .containers
952 .read()
953 .get(db_instance_identifier)
954 .cloned()
955 .ok_or(RuntimeError::Unavailable)?;
956
957 if let Some(k) = &self.k8s {
958 return k
959 .dump_database(&container.container_id, engine, username, password, db_name)
960 .await;
961 }
962
963 let args: Vec<String> = match engine {
964 "mysql" | "mariadb" => vec![
965 "exec".into(),
966 container.container_id.clone(),
967 "mysqldump".into(),
968 "-u".into(),
969 username.into(),
970 format!("-p{password}"),
971 db_name.into(),
972 ],
973 "postgres" => vec![
974 "exec".into(),
975 container.container_id.clone(),
976 "pg_dump".into(),
977 "-U".into(),
978 username.into(),
979 "-d".into(),
980 db_name.into(),
981 "--no-password".into(),
982 ],
983 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
990 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
991 return Err(RuntimeError::ContainerStartFailed(format!(
992 "engine {engine} is not yet supported by the snapshot/read-replica path; \
993 emulator stores the API state but cannot dump the underlying database"
994 )));
995 }
996 other => {
997 return Err(RuntimeError::ContainerStartFailed(format!(
998 "engine {other} is not supported by dump_database"
999 )));
1000 }
1001 };
1002
1003 let output = tokio::process::Command::new(&self.cli)
1004 .args(&args)
1005 .output()
1006 .await
1007 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1008
1009 if !output.status.success() {
1010 return Err(RuntimeError::ContainerStartFailed(format!(
1011 "dump failed: {}",
1012 String::from_utf8_lossy(&output.stderr).trim()
1013 )));
1014 }
1015
1016 Ok(output.stdout)
1017 }
1018
1019 pub async fn read_log_file(
1025 &self,
1026 db_instance_identifier: &str,
1027 container_path: &str,
1028 ) -> Result<Vec<u8>, RuntimeError> {
1029 let container = self
1030 .containers
1031 .read()
1032 .get(db_instance_identifier)
1033 .cloned()
1034 .ok_or(RuntimeError::Unavailable)?;
1035
1036 if let Some(k) = &self.k8s {
1037 return k.read_file(&container.container_id, container_path).await;
1038 }
1039
1040 let output = tokio::process::Command::new(&self.cli)
1041 .args(["exec", &container.container_id, "cat", container_path])
1042 .output()
1043 .await
1044 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1045
1046 if !output.status.success() {
1047 return Err(RuntimeError::ContainerStartFailed(format!(
1048 "cat {container_path} failed: {}",
1049 String::from_utf8_lossy(&output.stderr).trim()
1050 )));
1051 }
1052
1053 Ok(output.stdout)
1054 }
1055
1056 pub async fn restore_database(
1057 &self,
1058 db_instance_identifier: &str,
1059 engine: &str,
1060 username: &str,
1061 password: &str,
1062 db_name: &str,
1063 dump_data: &[u8],
1064 ) -> Result<(), RuntimeError> {
1065 let container = self
1066 .containers
1067 .read()
1068 .get(db_instance_identifier)
1069 .cloned()
1070 .ok_or(RuntimeError::Unavailable)?;
1071
1072 if let Some(k) = &self.k8s {
1073 return k
1074 .restore_database(
1075 &container.container_id,
1076 engine,
1077 username,
1078 password,
1079 db_name,
1080 dump_data,
1081 )
1082 .await;
1083 }
1084
1085 let args: Vec<String> = match engine {
1086 "mysql" | "mariadb" => vec![
1087 "exec".into(),
1088 "-i".into(),
1089 container.container_id.clone(),
1090 "mysql".into(),
1091 "-u".into(),
1092 username.into(),
1093 format!("-p{password}"),
1094 db_name.into(),
1095 ],
1096 "postgres" => vec![
1097 "exec".into(),
1098 "-i".into(),
1099 container.container_id.clone(),
1100 "psql".into(),
1101 "-U".into(),
1102 username.into(),
1103 "-d".into(),
1104 db_name.into(),
1105 "--no-password".into(),
1106 "-v".into(),
1107 "ON_ERROR_STOP=1".into(),
1108 ],
1109 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1110 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1111 return Err(RuntimeError::ContainerStartFailed(format!(
1112 "engine {engine} is not yet supported by the snapshot-restore path"
1113 )));
1114 }
1115 other => {
1116 return Err(RuntimeError::ContainerStartFailed(format!(
1117 "engine {other} is not supported by restore_database"
1118 )));
1119 }
1120 };
1121
1122 let mut child = tokio::process::Command::new(&self.cli)
1123 .args(&args)
1124 .stdin(std::process::Stdio::piped())
1125 .stdout(std::process::Stdio::piped())
1126 .stderr(std::process::Stdio::piped())
1127 .spawn()
1128 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1129
1130 if let Some(mut stdin) = child.stdin.take() {
1131 use tokio::io::AsyncWriteExt;
1132 stdin
1133 .write_all(dump_data)
1134 .await
1135 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1136 drop(stdin);
1137 }
1138
1139 let output = child
1140 .wait_with_output()
1141 .await
1142 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1143
1144 if !output.status.success() {
1145 return Err(RuntimeError::ContainerStartFailed(format!(
1146 "restore failed: {}",
1147 String::from_utf8_lossy(&output.stderr).trim()
1148 )));
1149 }
1150
1151 Ok(())
1152 }
1153}
1154
1155fn cli_available(cli: &str) -> bool {
1156 std::process::Command::new(cli)
1157 .arg("info")
1158 .stdout(std::process::Stdio::null())
1159 .stderr(std::process::Stdio::null())
1160 .status()
1161 .map(|status| status.success())
1162 .unwrap_or(false)
1163}
1164
1165fn detect_bridge_gateway(cli: &str) -> Option<String> {
1169 let output = std::process::Command::new(cli)
1170 .args([
1171 "network",
1172 "inspect",
1173 "bridge",
1174 "--format",
1175 "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
1176 ])
1177 .output()
1178 .ok()?;
1179 if !output.status.success() {
1180 return None;
1181 }
1182 let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
1183 if gateway.is_empty() || !gateway.contains('.') {
1184 return None;
1185 }
1186 Some(gateway)
1187}
1188
1189pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1198 let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1199 .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1200 let registry = registry.trim_end_matches('/');
1201 format!(
1202 "{}/{}:{}-{}",
1203 registry,
1204 image,
1205 major_version,
1206 env!("CARGO_PKG_VERSION")
1207 )
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213
1214 #[test]
1218 fn bridge_image_tag_resolves_registry_overrides() {
1219 let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
1220
1221 std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
1222 assert_eq!(
1223 bridge_image_tag("fakecloud-postgres", "16"),
1224 format!(
1225 "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1226 env!("CARGO_PKG_VERSION")
1227 )
1228 );
1229 assert_eq!(
1230 bridge_image_tag("fakecloud-mysql", "8.0"),
1231 format!(
1232 "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1233 env!("CARGO_PKG_VERSION")
1234 )
1235 );
1236 assert_eq!(
1237 bridge_image_tag("fakecloud-mariadb", "10.11"),
1238 format!(
1239 "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1240 env!("CARGO_PKG_VERSION")
1241 )
1242 );
1243
1244 std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
1245 assert_eq!(
1246 bridge_image_tag("fakecloud-postgres", "15"),
1247 format!(
1248 "registry.example.com/team/fakecloud-postgres:15-{}",
1249 env!("CARGO_PKG_VERSION")
1250 )
1251 );
1252
1253 std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
1254 assert_eq!(
1255 bridge_image_tag("fakecloud-postgres", "13"),
1256 format!(
1257 "registry.example.com/team/fakecloud-postgres:13-{}",
1258 env!("CARGO_PKG_VERSION")
1259 )
1260 );
1261
1262 match prev {
1263 Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
1264 None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
1265 }
1266 }
1267}