1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fakecloud_core::container_net::{detect_container_cli, HostNetworking};
6use parking_lot::RwLock;
7use tokio_postgres::NoTls;
8
9mod k8s;
10
11const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
12const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
13const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
14const AWS_COMMONS_UPGRADE_SQL: &str =
15 include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
16const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
17const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
18const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
19const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
20
21const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
22const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
23const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
24const MYSQL_BOOTSTRAP_SQL: &str =
25 include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
26
27const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
28const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
29const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
30const MARIADB_BOOTSTRAP_SQL: &str =
31 include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
32
33const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
40
41#[derive(Debug, Clone)]
42pub struct RunningDbContainer {
43 pub container_id: String,
45 pub host_port: u16,
47 pub endpoint_address: String,
50 pub endpoint_port: u16,
53}
54
55pub struct RdsRuntime {
56 cli: String,
57 containers: RwLock<HashMap<String, RunningDbContainer>>,
58 instance_id: String,
59 net: HostNetworking,
65 server_port: u16,
66 image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
67 k8s: Option<k8s::K8sDb>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum RuntimeError {
75 #[error("container runtime is unavailable")]
76 Unavailable,
77 #[error("container failed to start: {0}")]
78 ContainerStartFailed(String),
79}
80
81#[derive(Debug, thiserror::Error)]
85pub enum BackendInitError {
86 #[error(transparent)]
87 Env(#[from] fakecloud_k8s::K8sEnvError),
88 #[error("failed to connect to the Kubernetes cluster: {0}")]
89 Connect(String),
90}
91
92impl RdsRuntime {
93 pub fn new(server_port: u16) -> Option<Self> {
94 let cli = detect_container_cli()?;
101 let net = HostNetworking::detect(&cli);
102
103 Some(Self {
104 cli,
105 containers: RwLock::new(HashMap::new()),
106 instance_id: format!("fakecloud-{}", std::process::id()),
107 net,
108 server_port,
109 image_cache: RwLock::new(HashMap::new()),
110 k8s: None,
111 })
112 }
113
114 pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
118 let db = k8s::K8sDb::from_env(server_port).await?;
119 Ok(Self {
120 cli: String::new(),
121 containers: RwLock::new(HashMap::new()),
122 instance_id: format!("fakecloud-{}", std::process::id()),
123 net: HostNetworking {
127 host_alias: String::new(),
128 add_host_arg: None,
129 sibling_host: "127.0.0.1".to_string(),
130 },
131 server_port,
132 image_cache: RwLock::new(HashMap::new()),
133 k8s: Some(db),
134 })
135 }
136
137 pub fn cli_name(&self) -> &str {
138 if self.k8s.is_some() {
139 "kubernetes"
140 } else {
141 &self.cli
142 }
143 }
144
145 pub async fn reap_stale(&self) {
148 if let Some(k) = &self.k8s {
149 k.reap_stale().await;
150 }
151 }
152
153 #[allow(clippy::too_many_arguments)]
154 pub async fn ensure_postgres(
155 &self,
156 db_instance_identifier: &str,
157 engine: &str,
158 engine_version: &str,
159 username: &str,
160 password: &str,
161 db_name: &str,
162 account_id: &str,
163 region: &str,
164 ) -> Result<RunningDbContainer, RuntimeError> {
165 if let Some(k) = &self.k8s {
166 let running = k
167 .ensure(
168 db_instance_identifier,
169 engine,
170 engine_version,
171 username,
172 password,
173 db_name,
174 account_id,
175 region,
176 )
177 .await?;
178 self.containers
179 .write()
180 .insert(db_instance_identifier.to_string(), running.clone());
181 return Ok(running);
182 }
183 self.stop_container(db_instance_identifier).await;
184
185 let (image, port, env_vars, bridge_engine_version) = match engine {
193 "postgres" => {
194 let major_version = engine_version.split('.').next().unwrap_or("16");
195 let image = self.ensure_postgres_image(major_version).await?;
196 let env_vars = vec![
197 format!("POSTGRES_USER={username}"),
198 format!("POSTGRES_PASSWORD={password}"),
199 format!("POSTGRES_DB={db_name}"),
200 format!(
201 "FAKECLOUD_ENDPOINT=http://{}:{}",
202 self.net.host_alias, self.server_port
203 ),
204 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
205 format!("FAKECLOUD_REGION={region}"),
206 ];
207 (image, "5432", env_vars, Some(major_version.to_string()))
208 }
209 "mysql" => {
210 let _ = engine_version;
215 let major_version = "8.0";
216 let image = self.ensure_mysql_image(major_version).await?;
217 let env_vars = vec![
218 format!("MYSQL_ROOT_PASSWORD={password}"),
219 format!("MYSQL_USER={username}"),
220 format!("MYSQL_PASSWORD={password}"),
221 format!("MYSQL_DATABASE={db_name}"),
222 format!(
223 "FAKECLOUD_ENDPOINT=http://{}:{}",
224 self.net.host_alias, self.server_port
225 ),
226 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
227 format!("FAKECLOUD_REGION={region}"),
228 ];
229 (image, "3306", env_vars, Some(major_version.to_string()))
230 }
231 "mariadb" => {
232 let major_version = if engine_version.starts_with("10.11") {
233 "10.11"
234 } else if engine_version.starts_with("11.4") {
235 "11.4"
236 } else {
237 "10.6"
238 };
239 let image = self.ensure_mariadb_image(major_version).await?;
240 let env_vars = vec![
241 format!("MARIADB_ROOT_PASSWORD={password}"),
242 format!("MARIADB_USER={username}"),
243 format!("MARIADB_PASSWORD={password}"),
244 format!("MARIADB_DATABASE={db_name}"),
245 format!(
246 "FAKECLOUD_ENDPOINT=http://{}:{}",
247 self.net.host_alias, self.server_port
248 ),
249 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
250 format!("FAKECLOUD_REGION={region}"),
251 ];
252 (image, "3306", env_vars, Some(major_version.to_string()))
253 }
254 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
255 let image = "gvenzl/oracle-free:23-slim".to_string();
260 let env_vars = vec![
261 format!("ORACLE_PASSWORD={password}"),
262 format!("APP_USER={username}"),
263 format!("APP_USER_PASSWORD={password}"),
264 format!("ORACLE_DATABASE={db_name}"),
265 ];
266 (image, "1521", env_vars, None)
267 }
268 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
269 let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
275 let env_vars = vec![
276 "ACCEPT_EULA=Y".to_string(),
277 format!("MSSQL_SA_PASSWORD={password}"),
278 "MSSQL_PID=Express".to_string(),
279 ];
280 (image, "1433", env_vars, None)
281 }
282 "db2-se" | "db2-ae" => {
283 let image = "icr.io/db2_community/db2:latest".to_string();
287 let env_vars = vec![
288 "LICENSE=accept".to_string(),
289 "DB2INSTANCE=db2inst1".to_string(),
290 format!("DB2INST1_PASSWORD={password}"),
291 format!("DBNAME={db_name}"),
292 ];
293 (image, "50000", env_vars, None)
294 }
295 _ => {
296 return Err(RuntimeError::ContainerStartFailed(format!(
297 "Unsupported engine: {}",
298 engine
299 )))
300 }
301 };
302
303 let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
305
306 let mut args = vec![
308 "create".to_string(),
309 "-p".to_string(),
310 format!(":{}", port),
311 "--label".to_string(),
312 format!("fakecloud-rds={db_instance_identifier}"),
313 "--label".to_string(),
314 format!("fakecloud-instance={}", self.instance_id),
315 ];
316
317 if needs_privileged {
318 args.push("--privileged".to_string());
319 }
320
321 if bridge_engine_version.is_some() {
328 self.net.push_add_host_args(&mut args);
329 }
330
331 for env_var in env_vars {
332 args.push("-e".to_string());
333 args.push(env_var);
334 }
335
336 args.push(image);
337
338 let output = tokio::process::Command::new(&self.cli)
339 .args(&args)
340 .output()
341 .await
342 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
343
344 if !output.status.success() {
345 return Err(RuntimeError::ContainerStartFailed(
346 String::from_utf8_lossy(&output.stderr).trim().to_string(),
347 ));
348 }
349
350 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
351 let start_result = tokio::process::Command::new(&self.cli)
352 .args(["start", &container_id])
353 .output()
354 .await
355 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
356
357 if !start_result.status.success() {
358 self.remove_container(&container_id).await;
359 return Err(RuntimeError::ContainerStartFailed(format!(
360 "container start failed: {}",
361 String::from_utf8_lossy(&start_result.stderr).trim()
362 )));
363 }
364
365 let host_port = match self.lookup_port(&container_id, port).await {
366 Ok(host_port) => host_port,
367 Err(error) => {
368 self.remove_container(&container_id).await;
369 return Err(error);
370 }
371 };
372
373 let wait_result = match engine {
375 "postgres" => {
376 self.wait_for_postgres(username, password, db_name, host_port)
377 .await
378 }
379 "mysql" | "mariadb" => {
380 self.wait_for_mysql(username, password, db_name, host_port)
381 .await
382 }
383 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
384 self.wait_for_oracle(&container_id, host_port).await
385 }
386 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
387 self.wait_for_sqlserver(&container_id, host_port).await
388 }
389 "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
390 _ => unreachable!("engine already validated"),
391 };
392
393 if let Err(error) = wait_result {
394 self.remove_container(&container_id).await;
395 return Err(error);
396 }
397
398 let running = RunningDbContainer {
399 container_id,
400 host_port,
401 endpoint_address: self.net.sibling_host.clone(),
405 endpoint_port: host_port,
406 };
407 self.containers
408 .write()
409 .insert(db_instance_identifier.to_string(), running.clone());
410 Ok(running)
411 }
412
413 pub async fn stop_container(&self, db_instance_identifier: &str) {
414 let container = self.containers.write().remove(db_instance_identifier);
415 if let Some(container) = container {
416 if let Some(k) = &self.k8s {
417 k.delete_pod(&container.container_id).await;
418 } else {
419 self.remove_container(&container.container_id).await;
420 }
421 }
422 }
423
424 pub async fn restart_container(
425 &self,
426 db_instance_identifier: &str,
427 engine: &str,
428 username: &str,
429 password: &str,
430 db_name: &str,
431 ) -> Result<RunningDbContainer, RuntimeError> {
432 if let Some(k) = &self.k8s {
433 let running = k
434 .restart(db_instance_identifier, engine, username, password, db_name)
435 .await?;
436 self.containers
437 .write()
438 .insert(db_instance_identifier.to_string(), running.clone());
439 return Ok(running);
440 }
441 let running = self
442 .containers
443 .read()
444 .get(db_instance_identifier)
445 .cloned()
446 .ok_or(RuntimeError::Unavailable)?;
447
448 let output = tokio::process::Command::new(&self.cli)
449 .args(["restart", &running.container_id])
450 .output()
451 .await
452 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
453
454 if !output.status.success() {
455 return Err(RuntimeError::ContainerStartFailed(format!(
456 "container restart failed: {}",
457 String::from_utf8_lossy(&output.stderr).trim()
458 )));
459 }
460
461 let port = match engine {
462 "postgres" => "5432",
463 "mysql" | "mariadb" => "3306",
464 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
465 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
466 "db2-se" | "db2-ae" => "50000",
467 _ => "5432", };
469
470 let host_port = self.lookup_port(&running.container_id, port).await?;
471
472 match engine {
473 "postgres" => {
474 self.wait_for_postgres(username, password, db_name, host_port)
475 .await?
476 }
477 "mysql" | "mariadb" => {
478 self.wait_for_mysql(username, password, db_name, host_port)
479 .await?
480 }
481 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
482 self.wait_for_oracle(&running.container_id, host_port)
483 .await?
484 }
485 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
486 self.wait_for_sqlserver(&running.container_id, host_port)
487 .await?
488 }
489 "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
490 _ => {
491 self.wait_for_postgres(username, password, db_name, host_port)
492 .await?
493 }
494 };
495 let running = RunningDbContainer {
496 container_id: running.container_id,
497 host_port,
498 endpoint_address: self.net.sibling_host.clone(),
499 endpoint_port: host_port,
500 };
501 self.containers
502 .write()
503 .insert(db_instance_identifier.to_string(), running.clone());
504 Ok(running)
505 }
506
507 pub async fn stop_all(&self) {
508 let containers: Vec<String> = {
509 let mut containers = self.containers.write();
510 containers
511 .drain()
512 .map(|(_, container)| container.container_id)
513 .collect()
514 };
515 for container_id in containers {
516 self.remove_container(&container_id).await;
517 }
518 }
519
520 async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
521 let port_output = tokio::process::Command::new(&self.cli)
522 .args(["port", container_id, port])
523 .output()
524 .await
525 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
526
527 let port_str = String::from_utf8_lossy(&port_output.stdout);
528 port_str
529 .trim()
530 .rsplit(':')
531 .next()
532 .and_then(|value| value.parse::<u16>().ok())
533 .ok_or_else(|| {
534 RuntimeError::ContainerStartFailed(format!(
535 "could not determine container port from '{}'",
536 port_str.trim()
537 ))
538 })
539 }
540
541 async fn wait_for_postgres(
542 &self,
543 username: &str,
544 password: &str,
545 db_name: &str,
546 host_port: u16,
547 ) -> Result<(), RuntimeError> {
548 for _ in 0..40 {
549 tokio::time::sleep(Duration::from_millis(500)).await;
550 let connection_string = format!(
551 "host={} port={host_port} user={username} password={password} dbname={db_name}",
552 self.net.sibling_host
553 );
554 if let Ok((client, connection)) =
555 tokio_postgres::connect(&connection_string, NoTls).await
556 {
557 tokio::spawn(async move {
558 let _ = connection.await;
559 });
560 if client.simple_query("SELECT 1").await.is_ok() {
561 return Ok(());
562 }
563 }
564 }
565
566 Err(RuntimeError::ContainerStartFailed(
567 "postgres container did not become ready within 20 seconds".to_string(),
568 ))
569 }
570
571 async fn wait_for_mysql(
572 &self,
573 username: &str,
574 password: &str,
575 db_name: &str,
576 host_port: u16,
577 ) -> Result<(), RuntimeError> {
578 use mysql_async::prelude::*;
579 use mysql_async::OptsBuilder;
580
581 for attempt in 1..=40 {
582 let opts = OptsBuilder::default()
583 .ip_or_hostname(self.net.sibling_host.as_str())
584 .tcp_port(host_port)
585 .user(Some(username))
586 .pass(Some(password))
587 .db_name(Some(db_name));
588
589 match mysql_async::Conn::new(opts).await {
590 Ok(mut conn) => {
591 if conn.query_drop("SELECT 1").await.is_ok() {
592 let _ = conn.disconnect().await;
593 return Ok(());
594 }
595 }
596 Err(_) => {
597 if attempt < 40 {
598 tokio::time::sleep(Duration::from_millis(500)).await;
599 }
600 continue;
601 }
602 }
603 }
604
605 Err(RuntimeError::ContainerStartFailed(
606 "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
607 ))
608 }
609
610 async fn wait_for_oracle(
616 &self,
617 container_id: &str,
618 host_port: u16,
619 ) -> Result<(), RuntimeError> {
620 self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
621 .await?;
622 self.wait_for_tcp(host_port, 30).await
623 }
624
625 async fn wait_for_sqlserver(
629 &self,
630 container_id: &str,
631 host_port: u16,
632 ) -> Result<(), RuntimeError> {
633 self.wait_for_log_marker(
634 container_id,
635 "SQL Server is now ready for client connections",
636 180,
637 )
638 .await?;
639 self.wait_for_tcp(host_port, 30).await
640 }
641
642 async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
646 self.wait_for_log_marker(container_id, "Setup has completed", 360)
647 .await?;
648 self.wait_for_tcp(host_port, 60).await
649 }
650
651 async fn wait_for_log_marker(
654 &self,
655 container_id: &str,
656 marker: &str,
657 deadline_secs: u64,
658 ) -> Result<(), RuntimeError> {
659 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
660 while std::time::Instant::now() < deadline {
661 let output = tokio::process::Command::new(&self.cli)
662 .args(["logs", container_id])
663 .output()
664 .await
665 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
666 let stdout = String::from_utf8_lossy(&output.stdout);
667 let stderr = String::from_utf8_lossy(&output.stderr);
668 if stdout.contains(marker) || stderr.contains(marker) {
669 return Ok(());
670 }
671 tokio::time::sleep(Duration::from_secs(2)).await;
672 }
673 Err(RuntimeError::ContainerStartFailed(format!(
674 "container did not log '{}' within {} seconds",
675 marker, deadline_secs
676 )))
677 }
678
679 async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
683 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
684 let host = self.net.sibling_host.as_str();
685 while std::time::Instant::now() < deadline {
686 if tokio::net::TcpStream::connect((host, host_port))
687 .await
688 .is_ok()
689 {
690 return Ok(());
691 }
692 tokio::time::sleep(Duration::from_millis(500)).await;
693 }
694 Err(RuntimeError::ContainerStartFailed(format!(
695 "TCP probe to {}:{} did not succeed within {}s",
696 host, host_port, deadline_secs
697 )))
698 }
699
700 async fn remove_container(&self, container_id: &str) {
701 let _ = tokio::process::Command::new(&self.cli)
702 .args(["rm", "-f", container_id])
703 .output()
704 .await;
705 }
706
707 pub(crate) async fn ensure_postgres_image(
728 &self,
729 major_version: &str,
730 ) -> Result<String, RuntimeError> {
731 let tag = bridge_image_tag("fakecloud-postgres", major_version);
732 self.ensure_bridge_image(&tag, |tag| async move {
733 self.build_postgres_image_local(major_version, &tag).await
734 })
735 .await
736 }
737
738 async fn docker_image_exists(&self, tag: &str) -> bool {
739 tokio::process::Command::new(&self.cli)
740 .args(["image", "inspect", tag])
741 .stdout(std::process::Stdio::null())
742 .stderr(std::process::Stdio::null())
743 .status()
744 .await
745 .map(|status| status.success())
746 .unwrap_or(false)
747 }
748
749 async fn try_pull_image(&self, tag: &str) -> bool {
750 tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
751 let output = match tokio::process::Command::new(&self.cli)
752 .args(["pull", tag])
753 .output()
754 .await
755 {
756 Ok(output) => output,
757 Err(e) => {
758 tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
759 return false;
760 }
761 };
762 if output.status.success() {
763 return true;
764 }
765 tracing::info!(
766 tag = %tag,
767 stderr = %String::from_utf8_lossy(&output.stderr).trim(),
768 "Prebuilt postgres image not available, falling back to local build"
769 );
770 false
771 }
772
773 async fn build_postgres_image_local(
774 &self,
775 major_version: &str,
776 tag: &str,
777 ) -> Result<(), RuntimeError> {
778 let assets: [(&str, &str); 8] = [
779 ("Dockerfile", POSTGRES_DOCKERFILE),
780 ("aws_commons.control", AWS_COMMONS_CONTROL),
781 ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
782 ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
783 ("aws_lambda.control", AWS_LAMBDA_CONTROL),
784 ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
785 ("aws_s3.control", AWS_S3_CONTROL),
786 ("aws_s3--1.0.sql", AWS_S3_SQL),
787 ];
788 self.build_image_local(
789 tag,
790 &assets,
791 &format!("PG_VERSION={major_version}"),
792 "fakecloud-postgres",
793 )
794 .await
795 }
796
797 pub(crate) async fn ensure_mysql_image(
802 &self,
803 major_version: &str,
804 ) -> Result<String, RuntimeError> {
805 let tag = bridge_image_tag("fakecloud-mysql", major_version);
806 self.ensure_bridge_image(&tag, |tag| async move {
807 self.build_mysql_image_local(major_version, &tag).await
808 })
809 .await
810 }
811
812 pub(crate) async fn ensure_mariadb_image(
813 &self,
814 major_version: &str,
815 ) -> Result<String, RuntimeError> {
816 let tag = bridge_image_tag("fakecloud-mariadb", major_version);
817 self.ensure_bridge_image(&tag, |tag| async move {
818 self.build_mariadb_image_local(major_version, &tag).await
819 })
820 .await
821 }
822
823 async fn ensure_bridge_image<F, Fut>(
828 &self,
829 tag: &str,
830 build_local: F,
831 ) -> Result<String, RuntimeError>
832 where
833 F: FnOnce(String) -> Fut,
834 Fut: std::future::Future<Output = Result<(), RuntimeError>>,
835 {
836 let lock = {
837 let mut cache = self.image_cache.write();
838 cache
839 .entry(tag.to_string())
840 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
841 .clone()
842 };
843 let mut resolved = lock.lock().await;
844 if *resolved {
845 return Ok(tag.to_string());
846 }
847
848 let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
849 .map(|v| !v.is_empty())
850 .unwrap_or(false);
851
852 if !force_rebuild {
853 if self.docker_image_exists(tag).await {
854 *resolved = true;
855 return Ok(tag.to_string());
856 }
857 if self.try_pull_image(tag).await {
858 *resolved = true;
859 return Ok(tag.to_string());
860 }
861 }
862
863 build_local(tag.to_string()).await?;
864 *resolved = true;
865 Ok(tag.to_string())
866 }
867
868 async fn build_mysql_image_local(
869 &self,
870 major_version: &str,
871 tag: &str,
872 ) -> Result<(), RuntimeError> {
873 let assets: [(&str, &str); 4] = [
874 ("Dockerfile", MYSQL_DOCKERFILE),
875 ("fakecloud_udf.c", MYSQL_UDF_C),
876 ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
877 ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
878 ];
879 self.build_image_local(
880 tag,
881 &assets,
882 &format!("MYSQL_VERSION={major_version}"),
883 "fakecloud-mysql",
884 )
885 .await
886 }
887
888 async fn build_mariadb_image_local(
889 &self,
890 major_version: &str,
891 tag: &str,
892 ) -> Result<(), RuntimeError> {
893 let assets: [(&str, &str); 4] = [
894 ("Dockerfile", MARIADB_DOCKERFILE),
895 ("fakecloud_udf.c", MARIADB_UDF_C),
896 ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
897 ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
898 ];
899 self.build_image_local(
900 tag,
901 &assets,
902 &format!("MARIADB_VERSION={major_version}"),
903 "fakecloud-mariadb",
904 )
905 .await
906 }
907
908 async fn build_image_local(
909 &self,
910 tag: &str,
911 assets: &[(&str, &str)],
912 build_arg: &str,
913 image_label: &str,
914 ) -> Result<(), RuntimeError> {
915 let build_dir =
916 tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
917 for (name, contents) in assets {
918 tokio::fs::write(build_dir.path().join(name), contents)
919 .await
920 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
921 }
922
923 tracing::info!(
924 tag = %tag,
925 image = %image_label,
926 "Building {image_label} image locally (first use can take ~60s)"
927 );
928
929 let output = tokio::process::Command::new(&self.cli)
930 .args(["build", "--build-arg", build_arg, "-t", tag, "."])
931 .current_dir(build_dir.path())
932 .output()
933 .await
934 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
935
936 if !output.status.success() {
937 return Err(RuntimeError::ContainerStartFailed(format!(
938 "docker build for {} failed: {}",
939 tag,
940 String::from_utf8_lossy(&output.stderr).trim()
941 )));
942 }
943
944 Ok(())
945 }
946
947 pub async fn dump_database(
948 &self,
949 db_instance_identifier: &str,
950 engine: &str,
951 username: &str,
952 password: &str,
953 db_name: &str,
954 ) -> Result<Vec<u8>, RuntimeError> {
955 let container = self
956 .containers
957 .read()
958 .get(db_instance_identifier)
959 .cloned()
960 .ok_or(RuntimeError::Unavailable)?;
961
962 if let Some(k) = &self.k8s {
963 return k
964 .dump_database(&container.container_id, engine, username, password, db_name)
965 .await;
966 }
967
968 let args: Vec<String> = match engine {
969 "mysql" | "mariadb" => vec![
970 "exec".into(),
971 container.container_id.clone(),
972 "mysqldump".into(),
973 "-u".into(),
974 username.into(),
975 format!("-p{password}"),
976 db_name.into(),
977 ],
978 "postgres" => vec![
979 "exec".into(),
980 container.container_id.clone(),
981 "pg_dump".into(),
982 "-U".into(),
983 username.into(),
984 "-d".into(),
985 db_name.into(),
986 "--no-password".into(),
987 ],
988 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
995 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
996 return Err(RuntimeError::ContainerStartFailed(format!(
997 "engine {engine} is not yet supported by the snapshot/read-replica path; \
998 emulator stores the API state but cannot dump the underlying database"
999 )));
1000 }
1001 other => {
1002 return Err(RuntimeError::ContainerStartFailed(format!(
1003 "engine {other} is not supported by dump_database"
1004 )));
1005 }
1006 };
1007
1008 let output = tokio::process::Command::new(&self.cli)
1009 .args(&args)
1010 .output()
1011 .await
1012 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1013
1014 if !output.status.success() {
1015 return Err(RuntimeError::ContainerStartFailed(format!(
1016 "dump failed: {}",
1017 String::from_utf8_lossy(&output.stderr).trim()
1018 )));
1019 }
1020
1021 Ok(output.stdout)
1022 }
1023
1024 pub async fn read_log_file(
1030 &self,
1031 db_instance_identifier: &str,
1032 container_path: &str,
1033 ) -> Result<Vec<u8>, RuntimeError> {
1034 let container = self
1035 .containers
1036 .read()
1037 .get(db_instance_identifier)
1038 .cloned()
1039 .ok_or(RuntimeError::Unavailable)?;
1040
1041 if let Some(k) = &self.k8s {
1042 return k.read_file(&container.container_id, container_path).await;
1043 }
1044
1045 let output = tokio::process::Command::new(&self.cli)
1046 .args(["exec", &container.container_id, "cat", container_path])
1047 .output()
1048 .await
1049 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1050
1051 if !output.status.success() {
1052 return Err(RuntimeError::ContainerStartFailed(format!(
1053 "cat {container_path} failed: {}",
1054 String::from_utf8_lossy(&output.stderr).trim()
1055 )));
1056 }
1057
1058 Ok(output.stdout)
1059 }
1060
1061 pub async fn restore_database(
1062 &self,
1063 db_instance_identifier: &str,
1064 engine: &str,
1065 username: &str,
1066 password: &str,
1067 db_name: &str,
1068 dump_data: &[u8],
1069 ) -> Result<(), RuntimeError> {
1070 let container = self
1071 .containers
1072 .read()
1073 .get(db_instance_identifier)
1074 .cloned()
1075 .ok_or(RuntimeError::Unavailable)?;
1076
1077 if let Some(k) = &self.k8s {
1078 return k
1079 .restore_database(
1080 &container.container_id,
1081 engine,
1082 username,
1083 password,
1084 db_name,
1085 dump_data,
1086 )
1087 .await;
1088 }
1089
1090 let args: Vec<String> = match engine {
1091 "mysql" | "mariadb" => vec![
1092 "exec".into(),
1093 "-i".into(),
1094 container.container_id.clone(),
1095 "mysql".into(),
1096 "-u".into(),
1097 username.into(),
1098 format!("-p{password}"),
1099 db_name.into(),
1100 ],
1101 "postgres" => vec![
1102 "exec".into(),
1103 "-i".into(),
1104 container.container_id.clone(),
1105 "psql".into(),
1106 "-U".into(),
1107 username.into(),
1108 "-d".into(),
1109 db_name.into(),
1110 "--no-password".into(),
1111 "-v".into(),
1112 "ON_ERROR_STOP=1".into(),
1113 ],
1114 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1115 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1116 return Err(RuntimeError::ContainerStartFailed(format!(
1117 "engine {engine} is not yet supported by the snapshot-restore path"
1118 )));
1119 }
1120 other => {
1121 return Err(RuntimeError::ContainerStartFailed(format!(
1122 "engine {other} is not supported by restore_database"
1123 )));
1124 }
1125 };
1126
1127 let mut child = tokio::process::Command::new(&self.cli)
1128 .args(&args)
1129 .stdin(std::process::Stdio::piped())
1130 .stdout(std::process::Stdio::piped())
1131 .stderr(std::process::Stdio::piped())
1132 .spawn()
1133 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1134
1135 if let Some(mut stdin) = child.stdin.take() {
1136 use tokio::io::AsyncWriteExt;
1137 stdin
1138 .write_all(dump_data)
1139 .await
1140 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1141 drop(stdin);
1142 }
1143
1144 let output = child
1145 .wait_with_output()
1146 .await
1147 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1148
1149 if !output.status.success() {
1150 return Err(RuntimeError::ContainerStartFailed(format!(
1151 "restore failed: {}",
1152 String::from_utf8_lossy(&output.stderr).trim()
1153 )));
1154 }
1155
1156 Ok(())
1157 }
1158}
1159
1160pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1169 let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1170 .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1171 let registry = registry.trim_end_matches('/');
1172 format!(
1173 "{}/{}:{}-{}",
1174 registry,
1175 image,
1176 major_version,
1177 env!("CARGO_PKG_VERSION")
1178 )
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use super::*;
1184
1185 #[test]
1189 fn bridge_image_tag_resolves_registry_overrides() {
1190 let prev = std::env::var("FAKECLOUD_POSTGRES_REGISTRY").ok();
1191
1192 std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY");
1193 assert_eq!(
1194 bridge_image_tag("fakecloud-postgres", "16"),
1195 format!(
1196 "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1197 env!("CARGO_PKG_VERSION")
1198 )
1199 );
1200 assert_eq!(
1201 bridge_image_tag("fakecloud-mysql", "8.0"),
1202 format!(
1203 "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1204 env!("CARGO_PKG_VERSION")
1205 )
1206 );
1207 assert_eq!(
1208 bridge_image_tag("fakecloud-mariadb", "10.11"),
1209 format!(
1210 "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1211 env!("CARGO_PKG_VERSION")
1212 )
1213 );
1214
1215 std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team");
1216 assert_eq!(
1217 bridge_image_tag("fakecloud-postgres", "15"),
1218 format!(
1219 "registry.example.com/team/fakecloud-postgres:15-{}",
1220 env!("CARGO_PKG_VERSION")
1221 )
1222 );
1223
1224 std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", "registry.example.com/team/");
1225 assert_eq!(
1226 bridge_image_tag("fakecloud-postgres", "13"),
1227 format!(
1228 "registry.example.com/team/fakecloud-postgres:13-{}",
1229 env!("CARGO_PKG_VERSION")
1230 )
1231 );
1232
1233 match prev {
1234 Some(v) => std::env::set_var("FAKECLOUD_POSTGRES_REGISTRY", v),
1235 None => std::env::remove_var("FAKECLOUD_POSTGRES_REGISTRY"),
1236 }
1237 }
1238}