1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fakecloud_core::container_net::{detect_container_cli, HostNetworking};
6use parking_lot::RwLock;
7use tokio_postgres::NoTls;
8
9mod k8s;
10
11const POSTGRES_DOCKERFILE: &str = include_str!("../../assets/postgres/Dockerfile");
12const AWS_COMMONS_CONTROL: &str = include_str!("../../assets/postgres/aws_commons.control");
13const AWS_COMMONS_SQL: &str = include_str!("../../assets/postgres/aws_commons--1.1.sql");
14const AWS_COMMONS_UPGRADE_SQL: &str =
15 include_str!("../../assets/postgres/aws_commons--1.0--1.1.sql");
16const AWS_LAMBDA_CONTROL: &str = include_str!("../../assets/postgres/aws_lambda.control");
17const AWS_LAMBDA_SQL: &str = include_str!("../../assets/postgres/aws_lambda--1.0.sql");
18const AWS_S3_CONTROL: &str = include_str!("../../assets/postgres/aws_s3.control");
19const AWS_S3_SQL: &str = include_str!("../../assets/postgres/aws_s3--1.0.sql");
20
21const MYSQL_DOCKERFILE: &str = include_str!("../../assets/mysql/Dockerfile");
22const MYSQL_UDF_C: &str = include_str!("../../assets/mysql/fakecloud_udf.c");
23const MYSQL_BOOTSTRAP_SH: &str = include_str!("../../assets/mysql/fakecloud-bootstrap.sh");
24const MYSQL_BOOTSTRAP_SQL: &str =
25 include_str!("../../assets/mysql/99-fakecloud-bootstrap.sql.tmpl");
26
27const MARIADB_DOCKERFILE: &str = include_str!("../../assets/mariadb/Dockerfile");
28const MARIADB_UDF_C: &str = include_str!("../../assets/mariadb/fakecloud_udf.c");
29const MARIADB_BOOTSTRAP_SH: &str = include_str!("../../assets/mariadb/fakecloud-bootstrap.sh");
30const MARIADB_BOOTSTRAP_SQL: &str =
31 include_str!("../../assets/mariadb/99-fakecloud-bootstrap.sql.tmpl");
32
33const DEFAULT_POSTGRES_REGISTRY: &str = "ghcr.io/faiscadev";
40
41#[derive(Debug, Clone)]
42pub struct RunningDbContainer {
43 pub container_id: String,
45 pub host_port: u16,
47 pub endpoint_address: String,
50 pub endpoint_port: u16,
53}
54
55pub struct RdsRuntime {
56 cli: String,
57 containers: RwLock<HashMap<String, RunningDbContainer>>,
58 instance_id: String,
59 net: HostNetworking,
65 server_port: u16,
66 image_cache: RwLock<HashMap<String, Arc<tokio::sync::Mutex<bool>>>>,
67 k8s: Option<k8s::K8sDb>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum RuntimeError {
75 #[error("container runtime is unavailable")]
76 Unavailable,
77 #[error("container failed to start: {0}")]
78 ContainerStartFailed(String),
79}
80
81#[derive(Debug, thiserror::Error)]
85pub enum BackendInitError {
86 #[error(transparent)]
87 Env(#[from] fakecloud_k8s::K8sEnvError),
88 #[error(transparent)]
89 PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
90 #[error("failed to connect to the Kubernetes cluster: {0}")]
91 Connect(String),
92}
93
94impl RdsRuntime {
95 pub fn new(server_port: u16) -> Option<Self> {
96 let cli = detect_container_cli()?;
103 let net = HostNetworking::detect(&cli);
104
105 Some(Self {
106 cli,
107 containers: RwLock::new(HashMap::new()),
108 instance_id: format!("fakecloud-{}", std::process::id()),
109 net,
110 server_port,
111 image_cache: RwLock::new(HashMap::new()),
112 k8s: None,
113 })
114 }
115
116 pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
120 let db = k8s::K8sDb::from_env(server_port).await?;
121 Ok(Self {
122 cli: String::new(),
123 containers: RwLock::new(HashMap::new()),
124 instance_id: format!("fakecloud-{}", std::process::id()),
125 net: HostNetworking {
129 host_alias: String::new(),
130 add_host_arg: None,
131 sibling_host: "127.0.0.1".to_string(),
132 },
133 server_port,
134 image_cache: RwLock::new(HashMap::new()),
135 k8s: Some(db),
136 })
137 }
138
139 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 #[cfg(test)]
153 pub(crate) fn new_stub() -> Self {
154 Self {
155 cli: "true".to_string(),
156 containers: RwLock::new(HashMap::new()),
157 instance_id: format!("fakecloud-{}", std::process::id()),
158 net: HostNetworking {
159 host_alias: String::new(),
160 add_host_arg: None,
161 sibling_host: "127.0.0.1".to_string(),
162 },
163 server_port: 0,
164 image_cache: RwLock::new(HashMap::new()),
165 k8s: None,
166 }
167 }
168
169 pub async fn reap_stale(&self) {
172 if let Some(k) = &self.k8s {
173 k.reap_stale().await;
174 }
175 }
176
177 #[allow(clippy::too_many_arguments)]
178 pub async fn ensure_postgres(
179 &self,
180 db_instance_identifier: &str,
181 engine: &str,
182 engine_version: &str,
183 username: &str,
184 password: &str,
185 db_name: &str,
186 account_id: &str,
187 region: &str,
188 tags: &[crate::state::RdsTag],
189 ) -> Result<RunningDbContainer, RuntimeError> {
190 if let Some(k) = &self.k8s {
191 let tag_map: std::collections::BTreeMap<String, String> = tags
195 .iter()
196 .map(|t| (t.key.clone(), t.value.clone()))
197 .collect();
198 let running = k
199 .ensure(
200 db_instance_identifier,
201 engine,
202 engine_version,
203 username,
204 password,
205 db_name,
206 account_id,
207 region,
208 &tag_map,
209 )
210 .await?;
211 self.containers
212 .write()
213 .insert(db_instance_identifier.to_string(), running.clone());
214 return Ok(running);
215 }
216 self.stop_container(db_instance_identifier).await;
217
218 let (image, port, env_vars, bridge_engine_version) = match engine {
226 "postgres" => {
227 let major_version = engine_version.split('.').next().unwrap_or("16");
228 let image = self.ensure_postgres_image(major_version).await?;
229 let env_vars = vec![
230 format!("POSTGRES_USER={username}"),
231 format!("POSTGRES_PASSWORD={password}"),
232 format!("POSTGRES_DB={db_name}"),
233 format!(
234 "FAKECLOUD_ENDPOINT=http://{}:{}",
235 self.net.host_alias, self.server_port
236 ),
237 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
238 format!("FAKECLOUD_REGION={region}"),
239 ];
240 (image, "5432", env_vars, Some(major_version.to_string()))
241 }
242 "mysql" => {
243 let _ = engine_version;
248 let major_version = "8.0";
249 let image = self.ensure_mysql_image(major_version).await?;
250 let env_vars = vec![
251 format!("MYSQL_ROOT_PASSWORD={password}"),
252 format!("MYSQL_USER={username}"),
253 format!("MYSQL_PASSWORD={password}"),
254 format!("MYSQL_DATABASE={db_name}"),
255 format!(
256 "FAKECLOUD_ENDPOINT=http://{}:{}",
257 self.net.host_alias, self.server_port
258 ),
259 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
260 format!("FAKECLOUD_REGION={region}"),
261 ];
262 (image, "3306", env_vars, Some(major_version.to_string()))
263 }
264 "mariadb" => {
265 let major_version = if engine_version.starts_with("10.11") {
266 "10.11"
267 } else if engine_version.starts_with("11.4") {
268 "11.4"
269 } else {
270 "10.6"
271 };
272 let image = self.ensure_mariadb_image(major_version).await?;
273 let env_vars = vec![
274 format!("MARIADB_ROOT_PASSWORD={password}"),
275 format!("MARIADB_USER={username}"),
276 format!("MARIADB_PASSWORD={password}"),
277 format!("MARIADB_DATABASE={db_name}"),
278 format!(
279 "FAKECLOUD_ENDPOINT=http://{}:{}",
280 self.net.host_alias, self.server_port
281 ),
282 format!("FAKECLOUD_ACCOUNT_ID={account_id}"),
283 format!("FAKECLOUD_REGION={region}"),
284 ];
285 (image, "3306", env_vars, Some(major_version.to_string()))
286 }
287 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
288 let image = "gvenzl/oracle-free:23-slim".to_string();
293 let env_vars = vec![
294 format!("ORACLE_PASSWORD={password}"),
295 format!("APP_USER={username}"),
296 format!("APP_USER_PASSWORD={password}"),
297 format!("ORACLE_DATABASE={db_name}"),
298 ];
299 (image, "1521", env_vars, None)
300 }
301 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
302 let image = "mcr.microsoft.com/mssql/server:2022-latest".to_string();
308 let env_vars = vec![
309 "ACCEPT_EULA=Y".to_string(),
310 format!("MSSQL_SA_PASSWORD={password}"),
311 "MSSQL_PID=Express".to_string(),
312 ];
313 (image, "1433", env_vars, None)
314 }
315 "db2-se" | "db2-ae" => {
316 let image = "icr.io/db2_community/db2:latest".to_string();
320 let env_vars = vec![
321 "LICENSE=accept".to_string(),
322 "DB2INSTANCE=db2inst1".to_string(),
323 format!("DB2INST1_PASSWORD={password}"),
324 format!("DBNAME={db_name}"),
325 ];
326 (image, "50000", env_vars, None)
327 }
328 _ => {
329 return Err(RuntimeError::ContainerStartFailed(format!(
330 "Unsupported engine: {}",
331 engine
332 )))
333 }
334 };
335
336 let needs_privileged = matches!(engine, "db2-se" | "db2-ae");
338
339 let mut args = vec![
341 "create".to_string(),
342 "-p".to_string(),
343 format!(":{}", port),
344 "--label".to_string(),
345 format!("fakecloud-rds={db_instance_identifier}"),
346 "--label".to_string(),
347 format!("fakecloud-instance={}", self.instance_id),
348 ];
349
350 if needs_privileged {
351 args.push("--privileged".to_string());
352 }
353
354 if db_volumes_enabled() {
364 if let Some(data_dir) = engine_data_dir(engine) {
365 args.push("-v".to_string());
366 args.push(format!(
367 "{}:{data_dir}",
368 data_volume_name(account_id, db_instance_identifier)
369 ));
370 }
371 }
372
373 if bridge_engine_version.is_some() {
380 self.net.push_add_host_args(&mut args);
381 }
382
383 for env_var in env_vars {
384 args.push("-e".to_string());
385 args.push(env_var);
386 }
387
388 args.push(image);
389
390 let output = tokio::process::Command::new(&self.cli)
391 .args(&args)
392 .output()
393 .await
394 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
395
396 if !output.status.success() {
397 return Err(RuntimeError::ContainerStartFailed(
398 String::from_utf8_lossy(&output.stderr).trim().to_string(),
399 ));
400 }
401
402 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
403 let start_result = tokio::process::Command::new(&self.cli)
404 .args(["start", &container_id])
405 .output()
406 .await
407 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
408
409 if !start_result.status.success() {
410 self.remove_container(&container_id).await;
411 return Err(RuntimeError::ContainerStartFailed(format!(
412 "container start failed: {}",
413 String::from_utf8_lossy(&start_result.stderr).trim()
414 )));
415 }
416
417 let host_port = match self.lookup_port(&container_id, port).await {
418 Ok(host_port) => host_port,
419 Err(error) => {
420 self.remove_container(&container_id).await;
421 return Err(error);
422 }
423 };
424
425 let wait_result = match engine {
427 "postgres" => {
428 self.wait_for_postgres(username, password, db_name, host_port)
429 .await
430 }
431 "mysql" | "mariadb" => {
432 self.wait_for_mysql(username, password, db_name, host_port)
433 .await
434 }
435 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
436 self.wait_for_oracle(&container_id, host_port).await
437 }
438 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
439 self.wait_for_sqlserver(&container_id, host_port).await
440 }
441 "db2-se" | "db2-ae" => self.wait_for_db2(&container_id, host_port).await,
442 _ => unreachable!("engine already validated"),
443 };
444
445 if let Err(error) = wait_result {
446 self.remove_container(&container_id).await;
447 return Err(error);
448 }
449
450 let running = RunningDbContainer {
451 container_id,
452 host_port,
453 endpoint_address: self.net.sibling_host.clone(),
457 endpoint_port: host_port,
458 };
459 self.containers
460 .write()
461 .insert(db_instance_identifier.to_string(), running.clone());
462 Ok(running)
463 }
464
465 pub async fn stop_container(&self, db_instance_identifier: &str) {
466 let container = self.containers.write().remove(db_instance_identifier);
467 if let Some(container) = container {
468 if let Some(k) = &self.k8s {
469 k.delete_pod(&container.container_id).await;
470 } else {
471 self.remove_container(&container.container_id).await;
472 }
473 }
474 }
475
476 pub async fn restart_container(
477 &self,
478 db_instance_identifier: &str,
479 engine: &str,
480 username: &str,
481 password: &str,
482 db_name: &str,
483 ) -> Result<RunningDbContainer, RuntimeError> {
484 if let Some(k) = &self.k8s {
485 let running = k
486 .restart(db_instance_identifier, engine, username, password, db_name)
487 .await?;
488 self.containers
489 .write()
490 .insert(db_instance_identifier.to_string(), running.clone());
491 return Ok(running);
492 }
493 let running = self
494 .containers
495 .read()
496 .get(db_instance_identifier)
497 .cloned()
498 .ok_or(RuntimeError::Unavailable)?;
499
500 let output = tokio::process::Command::new(&self.cli)
501 .args(["restart", &running.container_id])
502 .output()
503 .await
504 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
505
506 if !output.status.success() {
507 return Err(RuntimeError::ContainerStartFailed(format!(
508 "container restart failed: {}",
509 String::from_utf8_lossy(&output.stderr).trim()
510 )));
511 }
512
513 let port = match engine {
514 "postgres" => "5432",
515 "mysql" | "mariadb" => "3306",
516 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "1521",
517 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "1433",
518 "db2-se" | "db2-ae" => "50000",
519 _ => "5432", };
521
522 let host_port = self.lookup_port(&running.container_id, port).await?;
523
524 match engine {
525 "postgres" => {
526 self.wait_for_postgres(username, password, db_name, host_port)
527 .await?
528 }
529 "mysql" | "mariadb" => {
530 self.wait_for_mysql(username, password, db_name, host_port)
531 .await?
532 }
533 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
534 self.wait_for_oracle(&running.container_id, host_port)
535 .await?
536 }
537 "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
538 self.wait_for_sqlserver(&running.container_id, host_port)
539 .await?
540 }
541 "db2-se" | "db2-ae" => self.wait_for_db2(&running.container_id, host_port).await?,
542 _ => {
543 self.wait_for_postgres(username, password, db_name, host_port)
544 .await?
545 }
546 };
547 let running = RunningDbContainer {
548 container_id: running.container_id,
549 host_port,
550 endpoint_address: self.net.sibling_host.clone(),
551 endpoint_port: host_port,
552 };
553 self.containers
554 .write()
555 .insert(db_instance_identifier.to_string(), running.clone());
556 Ok(running)
557 }
558
559 pub async fn stop_all(&self) {
560 let containers: Vec<String> = {
561 let mut containers = self.containers.write();
562 containers
563 .drain()
564 .map(|(_, container)| container.container_id)
565 .collect()
566 };
567 for container_id in containers {
568 self.remove_container(&container_id).await;
569 }
570 }
571
572 async fn lookup_port(&self, container_id: &str, port: &str) -> Result<u16, RuntimeError> {
573 let port_output = tokio::process::Command::new(&self.cli)
574 .args(["port", container_id, port])
575 .output()
576 .await
577 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
578
579 let port_str = String::from_utf8_lossy(&port_output.stdout);
580 port_str
581 .trim()
582 .rsplit(':')
583 .next()
584 .and_then(|value| value.parse::<u16>().ok())
585 .ok_or_else(|| {
586 RuntimeError::ContainerStartFailed(format!(
587 "could not determine container port from '{}'",
588 port_str.trim()
589 ))
590 })
591 }
592
593 async fn wait_for_postgres(
594 &self,
595 username: &str,
596 password: &str,
597 db_name: &str,
598 host_port: u16,
599 ) -> Result<(), RuntimeError> {
600 for _ in 0..40 {
601 tokio::time::sleep(Duration::from_millis(500)).await;
602 let connection_string = format!(
603 "host={} port={host_port} user={username} password={password} dbname={db_name}",
604 self.net.sibling_host
605 );
606 if let Ok((client, connection)) =
607 tokio_postgres::connect(&connection_string, NoTls).await
608 {
609 tokio::spawn(async move {
610 let _ = connection.await;
611 });
612 if client.simple_query("SELECT 1").await.is_ok() {
613 return Ok(());
614 }
615 }
616 }
617
618 Err(RuntimeError::ContainerStartFailed(
619 "postgres container did not become ready within 20 seconds".to_string(),
620 ))
621 }
622
623 async fn wait_for_mysql(
624 &self,
625 username: &str,
626 password: &str,
627 db_name: &str,
628 host_port: u16,
629 ) -> Result<(), RuntimeError> {
630 use mysql_async::prelude::*;
631 use mysql_async::OptsBuilder;
632
633 for attempt in 1..=40 {
634 let opts = OptsBuilder::default()
635 .ip_or_hostname(self.net.sibling_host.as_str())
636 .tcp_port(host_port)
637 .user(Some(username))
638 .pass(Some(password))
639 .db_name(Some(db_name));
640
641 match mysql_async::Conn::new(opts).await {
642 Ok(mut conn) => {
643 if conn.query_drop("SELECT 1").await.is_ok() {
644 let _ = conn.disconnect().await;
645 return Ok(());
646 }
647 }
648 Err(_) => {
649 if attempt < 40 {
650 tokio::time::sleep(Duration::from_millis(500)).await;
651 }
652 continue;
653 }
654 }
655 }
656
657 Err(RuntimeError::ContainerStartFailed(
658 "MySQL/MariaDB container did not become ready within 20 seconds".to_string(),
659 ))
660 }
661
662 async fn wait_for_oracle(
668 &self,
669 container_id: &str,
670 host_port: u16,
671 ) -> Result<(), RuntimeError> {
672 self.wait_for_log_marker(container_id, "DATABASE IS READY TO USE!", 240)
673 .await?;
674 self.wait_for_tcp(host_port, 30).await
675 }
676
677 async fn wait_for_sqlserver(
681 &self,
682 container_id: &str,
683 host_port: u16,
684 ) -> Result<(), RuntimeError> {
685 self.wait_for_log_marker(
686 container_id,
687 "SQL Server is now ready for client connections",
688 180,
689 )
690 .await?;
691 self.wait_for_tcp(host_port, 30).await
692 }
693
694 async fn wait_for_db2(&self, container_id: &str, host_port: u16) -> Result<(), RuntimeError> {
698 self.wait_for_log_marker(container_id, "Setup has completed", 360)
699 .await?;
700 self.wait_for_tcp(host_port, 60).await
701 }
702
703 async fn wait_for_log_marker(
706 &self,
707 container_id: &str,
708 marker: &str,
709 deadline_secs: u64,
710 ) -> Result<(), RuntimeError> {
711 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
712 while std::time::Instant::now() < deadline {
713 let output = tokio::process::Command::new(&self.cli)
714 .args(["logs", container_id])
715 .output()
716 .await
717 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
718 let stdout = String::from_utf8_lossy(&output.stdout);
719 let stderr = String::from_utf8_lossy(&output.stderr);
720 if stdout.contains(marker) || stderr.contains(marker) {
721 return Ok(());
722 }
723 tokio::time::sleep(Duration::from_secs(2)).await;
724 }
725 Err(RuntimeError::ContainerStartFailed(format!(
726 "container did not log '{}' within {} seconds",
727 marker, deadline_secs
728 )))
729 }
730
731 async fn wait_for_tcp(&self, host_port: u16, deadline_secs: u64) -> Result<(), RuntimeError> {
735 let deadline = std::time::Instant::now() + Duration::from_secs(deadline_secs);
736 let host = self.net.sibling_host.as_str();
737 while std::time::Instant::now() < deadline {
738 if tokio::net::TcpStream::connect((host, host_port))
739 .await
740 .is_ok()
741 {
742 return Ok(());
743 }
744 tokio::time::sleep(Duration::from_millis(500)).await;
745 }
746 Err(RuntimeError::ContainerStartFailed(format!(
747 "TCP probe to {}:{} did not succeed within {}s",
748 host, host_port, deadline_secs
749 )))
750 }
751
752 async fn remove_container(&self, container_id: &str) {
753 let _ = tokio::process::Command::new(&self.cli)
754 .args(["rm", "-f", container_id])
755 .output()
756 .await;
757 }
758
759 pub async fn remove_data_volume(&self, account_id: &str, db_instance_identifier: &str) {
765 if self.k8s.is_some() || !db_volumes_enabled() {
766 return;
767 }
768 let name = data_volume_name(account_id, db_instance_identifier);
769 let _ = tokio::process::Command::new(&self.cli)
770 .args(["volume", "rm", "-f", &name])
771 .output()
772 .await;
773 }
774
775 pub(crate) async fn ensure_postgres_image(
796 &self,
797 major_version: &str,
798 ) -> Result<String, RuntimeError> {
799 let tag = bridge_image_tag("fakecloud-postgres", major_version);
800 self.ensure_bridge_image(&tag, |tag| async move {
801 self.build_postgres_image_local(major_version, &tag).await
802 })
803 .await
804 }
805
806 async fn docker_image_exists(&self, tag: &str) -> bool {
807 tokio::process::Command::new(&self.cli)
808 .args(["image", "inspect", tag])
809 .stdout(std::process::Stdio::null())
810 .stderr(std::process::Stdio::null())
811 .status()
812 .await
813 .map(|status| status.success())
814 .unwrap_or(false)
815 }
816
817 async fn try_pull_image(&self, tag: &str) -> bool {
818 tracing::info!(tag = %tag, "Pulling prebuilt fakecloud-postgres image");
819 let output = match tokio::process::Command::new(&self.cli)
820 .args(["pull", tag])
821 .output()
822 .await
823 {
824 Ok(output) => output,
825 Err(e) => {
826 tracing::debug!(tag = %tag, error = %e, "docker pull failed to spawn");
827 return false;
828 }
829 };
830 if output.status.success() {
831 return true;
832 }
833 tracing::info!(
834 tag = %tag,
835 stderr = %String::from_utf8_lossy(&output.stderr).trim(),
836 "Prebuilt postgres image not available, falling back to local build"
837 );
838 false
839 }
840
841 async fn build_postgres_image_local(
842 &self,
843 major_version: &str,
844 tag: &str,
845 ) -> Result<(), RuntimeError> {
846 let assets: [(&str, &str); 8] = [
847 ("Dockerfile", POSTGRES_DOCKERFILE),
848 ("aws_commons.control", AWS_COMMONS_CONTROL),
849 ("aws_commons--1.1.sql", AWS_COMMONS_SQL),
850 ("aws_commons--1.0--1.1.sql", AWS_COMMONS_UPGRADE_SQL),
851 ("aws_lambda.control", AWS_LAMBDA_CONTROL),
852 ("aws_lambda--1.0.sql", AWS_LAMBDA_SQL),
853 ("aws_s3.control", AWS_S3_CONTROL),
854 ("aws_s3--1.0.sql", AWS_S3_SQL),
855 ];
856 self.build_image_local(
857 tag,
858 &assets,
859 &format!("PG_VERSION={major_version}"),
860 "fakecloud-postgres",
861 )
862 .await
863 }
864
865 pub(crate) async fn ensure_mysql_image(
870 &self,
871 major_version: &str,
872 ) -> Result<String, RuntimeError> {
873 let tag = bridge_image_tag("fakecloud-mysql", major_version);
874 self.ensure_bridge_image(&tag, |tag| async move {
875 self.build_mysql_image_local(major_version, &tag).await
876 })
877 .await
878 }
879
880 pub(crate) async fn ensure_mariadb_image(
881 &self,
882 major_version: &str,
883 ) -> Result<String, RuntimeError> {
884 let tag = bridge_image_tag("fakecloud-mariadb", major_version);
885 self.ensure_bridge_image(&tag, |tag| async move {
886 self.build_mariadb_image_local(major_version, &tag).await
887 })
888 .await
889 }
890
891 async fn ensure_bridge_image<F, Fut>(
896 &self,
897 tag: &str,
898 build_local: F,
899 ) -> Result<String, RuntimeError>
900 where
901 F: FnOnce(String) -> Fut,
902 Fut: std::future::Future<Output = Result<(), RuntimeError>>,
903 {
904 let lock = {
905 let mut cache = self.image_cache.write();
906 cache
907 .entry(tag.to_string())
908 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(false)))
909 .clone()
910 };
911 let mut resolved = lock.lock().await;
912 if *resolved {
913 return Ok(tag.to_string());
914 }
915
916 let force_rebuild = std::env::var("FAKECLOUD_REBUILD_POSTGRES_IMAGE")
917 .map(|v| !v.is_empty())
918 .unwrap_or(false);
919
920 if !force_rebuild {
921 if self.docker_image_exists(tag).await {
922 *resolved = true;
923 return Ok(tag.to_string());
924 }
925 if self.try_pull_image(tag).await {
926 *resolved = true;
927 return Ok(tag.to_string());
928 }
929 }
930
931 build_local(tag.to_string()).await?;
932 *resolved = true;
933 Ok(tag.to_string())
934 }
935
936 async fn build_mysql_image_local(
937 &self,
938 major_version: &str,
939 tag: &str,
940 ) -> Result<(), RuntimeError> {
941 let assets: [(&str, &str); 4] = [
942 ("Dockerfile", MYSQL_DOCKERFILE),
943 ("fakecloud_udf.c", MYSQL_UDF_C),
944 ("fakecloud-bootstrap.sh", MYSQL_BOOTSTRAP_SH),
945 ("99-fakecloud-bootstrap.sql.tmpl", MYSQL_BOOTSTRAP_SQL),
946 ];
947 self.build_image_local(
948 tag,
949 &assets,
950 &format!("MYSQL_VERSION={major_version}"),
951 "fakecloud-mysql",
952 )
953 .await
954 }
955
956 async fn build_mariadb_image_local(
957 &self,
958 major_version: &str,
959 tag: &str,
960 ) -> Result<(), RuntimeError> {
961 let assets: [(&str, &str); 4] = [
962 ("Dockerfile", MARIADB_DOCKERFILE),
963 ("fakecloud_udf.c", MARIADB_UDF_C),
964 ("fakecloud-bootstrap.sh", MARIADB_BOOTSTRAP_SH),
965 ("99-fakecloud-bootstrap.sql.tmpl", MARIADB_BOOTSTRAP_SQL),
966 ];
967 self.build_image_local(
968 tag,
969 &assets,
970 &format!("MARIADB_VERSION={major_version}"),
971 "fakecloud-mariadb",
972 )
973 .await
974 }
975
976 async fn build_image_local(
977 &self,
978 tag: &str,
979 assets: &[(&str, &str)],
980 build_arg: &str,
981 image_label: &str,
982 ) -> Result<(), RuntimeError> {
983 let build_dir =
984 tempfile::tempdir().map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
985 for (name, contents) in assets {
986 tokio::fs::write(build_dir.path().join(name), contents)
987 .await
988 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
989 }
990
991 tracing::info!(
992 tag = %tag,
993 image = %image_label,
994 "Building {image_label} image locally (first use can take ~60s)"
995 );
996
997 let output = tokio::process::Command::new(&self.cli)
998 .args(["build", "--build-arg", build_arg, "-t", tag, "."])
999 .current_dir(build_dir.path())
1000 .output()
1001 .await
1002 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1003
1004 if !output.status.success() {
1005 return Err(RuntimeError::ContainerStartFailed(format!(
1006 "docker build for {} failed: {}",
1007 tag,
1008 String::from_utf8_lossy(&output.stderr).trim()
1009 )));
1010 }
1011
1012 Ok(())
1013 }
1014
1015 pub async fn dump_database(
1016 &self,
1017 db_instance_identifier: &str,
1018 engine: &str,
1019 username: &str,
1020 password: &str,
1021 db_name: &str,
1022 ) -> Result<Vec<u8>, RuntimeError> {
1023 let container = self
1024 .containers
1025 .read()
1026 .get(db_instance_identifier)
1027 .cloned()
1028 .ok_or(RuntimeError::Unavailable)?;
1029
1030 if let Some(k) = &self.k8s {
1031 return k
1032 .dump_database(&container.container_id, engine, username, password, db_name)
1033 .await;
1034 }
1035
1036 let args: Vec<String> = match engine {
1037 "mysql" | "mariadb" => vec![
1038 "exec".into(),
1039 container.container_id.clone(),
1040 "mysqldump".into(),
1041 "-u".into(),
1042 username.into(),
1043 format!("-p{password}"),
1044 db_name.into(),
1045 ],
1046 "postgres" => vec![
1047 "exec".into(),
1048 container.container_id.clone(),
1049 "pg_dump".into(),
1050 "-U".into(),
1051 username.into(),
1052 "-d".into(),
1053 db_name.into(),
1054 "--no-password".into(),
1055 ],
1056 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1063 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1064 return Err(RuntimeError::ContainerStartFailed(format!(
1065 "engine {engine} is not yet supported by the snapshot/read-replica path; \
1066 emulator stores the API state but cannot dump the underlying database"
1067 )));
1068 }
1069 other => {
1070 return Err(RuntimeError::ContainerStartFailed(format!(
1071 "engine {other} is not supported by dump_database"
1072 )));
1073 }
1074 };
1075
1076 let output = tokio::process::Command::new(&self.cli)
1077 .args(&args)
1078 .output()
1079 .await
1080 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1081
1082 if !output.status.success() {
1083 return Err(RuntimeError::ContainerStartFailed(format!(
1084 "dump failed: {}",
1085 String::from_utf8_lossy(&output.stderr).trim()
1086 )));
1087 }
1088
1089 Ok(output.stdout)
1090 }
1091
1092 pub async fn read_log_file(
1098 &self,
1099 db_instance_identifier: &str,
1100 container_path: &str,
1101 ) -> Result<Vec<u8>, RuntimeError> {
1102 let container = self
1103 .containers
1104 .read()
1105 .get(db_instance_identifier)
1106 .cloned()
1107 .ok_or(RuntimeError::Unavailable)?;
1108
1109 if let Some(k) = &self.k8s {
1110 return k.read_file(&container.container_id, container_path).await;
1111 }
1112
1113 let output = tokio::process::Command::new(&self.cli)
1114 .args(["exec", &container.container_id, "cat", container_path])
1115 .output()
1116 .await
1117 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1118
1119 if !output.status.success() {
1120 return Err(RuntimeError::ContainerStartFailed(format!(
1121 "cat {container_path} failed: {}",
1122 String::from_utf8_lossy(&output.stderr).trim()
1123 )));
1124 }
1125
1126 Ok(output.stdout)
1127 }
1128
1129 pub async fn restore_database(
1130 &self,
1131 db_instance_identifier: &str,
1132 engine: &str,
1133 username: &str,
1134 password: &str,
1135 db_name: &str,
1136 dump_data: &[u8],
1137 ) -> Result<(), RuntimeError> {
1138 let container = self
1139 .containers
1140 .read()
1141 .get(db_instance_identifier)
1142 .cloned()
1143 .ok_or(RuntimeError::Unavailable)?;
1144
1145 if let Some(k) = &self.k8s {
1146 return k
1147 .restore_database(
1148 &container.container_id,
1149 engine,
1150 username,
1151 password,
1152 db_name,
1153 dump_data,
1154 )
1155 .await;
1156 }
1157
1158 let args: Vec<String> = match engine {
1159 "mysql" | "mariadb" => vec![
1160 "exec".into(),
1161 "-i".into(),
1162 container.container_id.clone(),
1163 "mysql".into(),
1164 "-u".into(),
1165 username.into(),
1166 format!("-p{password}"),
1167 db_name.into(),
1168 ],
1169 "postgres" => vec![
1170 "exec".into(),
1171 "-i".into(),
1172 container.container_id.clone(),
1173 "psql".into(),
1174 "-U".into(),
1175 username.into(),
1176 "-d".into(),
1177 db_name.into(),
1178 "--no-password".into(),
1179 "-v".into(),
1180 "ON_ERROR_STOP=1".into(),
1181 ],
1182 "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" | "sqlserver-ee"
1183 | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" | "db2-se" | "db2-ae" => {
1184 return Err(RuntimeError::ContainerStartFailed(format!(
1185 "engine {engine} is not yet supported by the snapshot-restore path"
1186 )));
1187 }
1188 other => {
1189 return Err(RuntimeError::ContainerStartFailed(format!(
1190 "engine {other} is not supported by restore_database"
1191 )));
1192 }
1193 };
1194
1195 let mut child = tokio::process::Command::new(&self.cli)
1196 .args(&args)
1197 .stdin(std::process::Stdio::piped())
1198 .stdout(std::process::Stdio::piped())
1199 .stderr(std::process::Stdio::piped())
1200 .spawn()
1201 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1202
1203 if let Some(mut stdin) = child.stdin.take() {
1204 use tokio::io::AsyncWriteExt;
1205 stdin
1206 .write_all(dump_data)
1207 .await
1208 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1209 drop(stdin);
1210 }
1211
1212 let output = child
1213 .wait_with_output()
1214 .await
1215 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
1216
1217 if !output.status.success() {
1218 return Err(RuntimeError::ContainerStartFailed(format!(
1219 "restore failed: {}",
1220 String::from_utf8_lossy(&output.stderr).trim()
1221 )));
1222 }
1223
1224 Ok(())
1225 }
1226}
1227
1228fn db_volumes_enabled() -> bool {
1233 std::env::var("FAKECLOUD_PERSIST_DB_VOLUMES")
1234 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes"))
1235 .unwrap_or(false)
1236}
1237
1238fn engine_data_dir(engine: &str) -> Option<&'static str> {
1242 match engine {
1243 "postgres" => Some("/var/lib/postgresql/data"),
1244 "mysql" | "mariadb" => Some("/var/lib/mysql"),
1245 _ => None,
1246 }
1247}
1248
1249fn data_volume_name(account_id: &str, db_instance_identifier: &str) -> String {
1254 let sanitize = |s: &str| -> String {
1255 s.chars()
1256 .map(|c| {
1257 if c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-' {
1258 c
1259 } else {
1260 '-'
1261 }
1262 })
1263 .collect()
1264 };
1265 format!(
1266 "fakecloud-rds-data-{}-{}",
1267 sanitize(account_id),
1268 sanitize(db_instance_identifier)
1269 )
1270}
1271
1272pub(crate) fn bridge_image_tag(image: &str, major_version: &str) -> String {
1281 let registry = std::env::var("FAKECLOUD_POSTGRES_REGISTRY")
1282 .unwrap_or_else(|_| DEFAULT_POSTGRES_REGISTRY.to_string());
1283 bridge_image_tag_with_registry(®istry, image, major_version)
1284}
1285
1286fn bridge_image_tag_with_registry(registry: &str, image: &str, major_version: &str) -> String {
1292 let registry = registry.trim_end_matches('/');
1293 format!(
1294 "{}/{}:{}-{}",
1295 registry,
1296 image,
1297 major_version,
1298 env!("CARGO_PKG_VERSION")
1299 )
1300}
1301
1302#[cfg(test)]
1303mod tests {
1304 use super::*;
1305
1306 #[test]
1307 fn engine_data_dir_only_maps_volume_friendly_engines() {
1308 assert_eq!(
1309 engine_data_dir("postgres"),
1310 Some("/var/lib/postgresql/data")
1311 );
1312 assert_eq!(engine_data_dir("mysql"), Some("/var/lib/mysql"));
1313 assert_eq!(engine_data_dir("mariadb"), Some("/var/lib/mysql"));
1314 assert_eq!(engine_data_dir("oracle-ee"), None);
1316 assert_eq!(engine_data_dir("sqlserver-ex"), None);
1317 assert_eq!(engine_data_dir("db2-se"), None);
1318 }
1319
1320 #[test]
1321 fn data_volume_name_is_stable_and_sanitized() {
1322 assert_eq!(
1325 data_volume_name("123456789012", "my-db"),
1326 "fakecloud-rds-data-123456789012-my-db"
1327 );
1328 assert_eq!(
1330 data_volume_name("123456789012", "weird/name:1"),
1331 "fakecloud-rds-data-123456789012-weird-name-1"
1332 );
1333 }
1334
1335 #[test]
1340 fn bridge_image_tag_resolves_registry_overrides() {
1341 assert_eq!(
1343 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-postgres", "16"),
1344 format!(
1345 "ghcr.io/faiscadev/fakecloud-postgres:16-{}",
1346 env!("CARGO_PKG_VERSION")
1347 )
1348 );
1349 assert_eq!(
1350 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mysql", "8.0"),
1351 format!(
1352 "ghcr.io/faiscadev/fakecloud-mysql:8.0-{}",
1353 env!("CARGO_PKG_VERSION")
1354 )
1355 );
1356 assert_eq!(
1357 bridge_image_tag_with_registry(DEFAULT_POSTGRES_REGISTRY, "fakecloud-mariadb", "10.11"),
1358 format!(
1359 "ghcr.io/faiscadev/fakecloud-mariadb:10.11-{}",
1360 env!("CARGO_PKG_VERSION")
1361 )
1362 );
1363
1364 assert_eq!(
1366 bridge_image_tag_with_registry("registry.example.com/team", "fakecloud-postgres", "15"),
1367 format!(
1368 "registry.example.com/team/fakecloud-postgres:15-{}",
1369 env!("CARGO_PKG_VERSION")
1370 )
1371 );
1372
1373 assert_eq!(
1375 bridge_image_tag_with_registry(
1376 "registry.example.com/team/",
1377 "fakecloud-postgres",
1378 "13"
1379 ),
1380 format!(
1381 "registry.example.com/team/fakecloud-postgres:13-{}",
1382 env!("CARGO_PKG_VERSION")
1383 )
1384 );
1385 }
1386
1387 fn running_stub(container_id: &str) -> RunningDbContainer {
1388 RunningDbContainer {
1389 container_id: container_id.to_string(),
1390 host_port: 54321,
1391 endpoint_address: "127.0.0.1".to_string(),
1392 endpoint_port: 54321,
1393 }
1394 }
1395
1396 #[tokio::test]
1404 async fn stop_container_before_registration_is_a_noop_then_registration_leaks() {
1405 let rt = RdsRuntime::new_stub();
1406
1407 rt.stop_container("db-1").await;
1409 assert!(
1410 rt.containers.read().is_empty(),
1411 "nothing registered yet, stop is a no-op",
1412 );
1413
1414 rt.containers
1416 .write()
1417 .insert("db-1".to_string(), running_stub("container-abc"));
1418
1419 assert_eq!(
1423 rt.containers.read().len(),
1424 1,
1425 "the registered container leaks with no cleanup branch",
1426 );
1427 }
1428
1429 #[tokio::test]
1435 async fn stop_container_after_registration_reaps_orphan_on_delete_during_create() {
1436 let rt = RdsRuntime::new_stub();
1437
1438 rt.containers
1441 .write()
1442 .insert("db-1".to_string(), running_stub("container-abc"));
1443
1444 rt.stop_container("db-1").await;
1446
1447 assert!(
1448 rt.containers.read().is_empty(),
1449 "stop_container must reap the registered orphan: {:?}",
1450 rt.containers.read().keys().collect::<Vec<_>>(),
1451 );
1452 }
1453}