1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 }
365
366 let pending: Vec<Pending> = {
367 let mut accounts = self.state.write();
368 let mut out = Vec::new();
369 for (_, state) in accounts.iter_mut() {
370 let account_id = state.account_id.clone();
371 let region = state.region.clone();
372 for (id, inst) in state.instances.iter_mut() {
373 if !matches!(
374 inst.db_instance_status.as_str(),
375 "available" | "starting" | "modifying" | "rebooting" | "backing-up"
376 ) {
377 continue;
378 }
379 inst.db_instance_status = "starting".to_string();
380 out.push(Pending {
381 account_id: account_id.clone(),
382 region: region.clone(),
383 id: id.clone(),
384 arn: inst.db_instance_arn.clone(),
385 engine: inst.engine.clone(),
386 engine_version: inst.engine_version.clone(),
387 username: inst.master_username.clone(),
388 password: inst.master_user_password.clone(),
389 db_name: inst
390 .db_name
391 .clone()
392 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
393 });
394 }
395 }
396 out
397 };
398
399 if pending.is_empty() {
400 return;
401 }
402 tracing::info!(
403 count = pending.len(),
404 "recovering backing containers for persisted rds instances",
405 );
406
407 for p in pending {
408 let runtime = runtime.clone();
409 let state = self.state.clone();
410 let snapshot_store = self.snapshot_store.clone();
411 let snapshot_lock = self.snapshot_lock.clone();
412 let delivery_bus = self.delivery_bus.clone();
413 tokio::spawn(async move {
414 match runtime
415 .ensure_postgres(
416 &p.id,
417 &p.engine,
418 &p.engine_version,
419 &p.username,
420 &p.password,
421 &p.db_name,
422 &p.account_id,
423 &p.region,
424 )
425 .await
426 {
427 Ok(running) => {
428 {
429 let mut accounts = state.write();
430 if let Some(s) = accounts.get_mut(&p.account_id) {
431 if let Some(inst) = s.instances.get_mut(&p.id) {
432 inst.db_instance_status = "available".to_string();
433 inst.endpoint_address = "127.0.0.1".to_string();
434 inst.port = i32::from(running.host_port);
435 inst.host_port = running.host_port;
436 inst.container_id = running.container_id;
437 }
438 }
439 }
440 save_snapshot_static(
441 state.clone(),
442 snapshot_store.clone(),
443 snapshot_lock.clone(),
444 )
445 .await;
446 emit_event_static(
447 delivery_bus.as_ref(),
448 RdsSourceType::DbInstance,
449 &p.id,
450 &p.arn,
451 "RDS-EVENT-0088",
452 &["notification"],
453 "DB instance restarted after fakecloud restart",
454 );
455 }
456 Err(error) => {
457 tracing::error!(
458 %error,
459 db_instance_identifier = %p.id,
460 "failed to recover rds backing container after restart",
461 );
462 {
463 let mut accounts = state.write();
464 if let Some(s) = accounts.get_mut(&p.account_id) {
465 if let Some(inst) = s.instances.get_mut(&p.id) {
466 inst.db_instance_status = "failed".to_string();
467 }
468 }
469 }
470 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
471 }
472 }
473 });
474 }
475 }
476
477 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
483 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
484
485 let arn = {
490 let accounts = self.state.read();
491 let empty = RdsState::new(&request.account_id, &request.region);
492 let state = accounts.get(&request.account_id).unwrap_or(&empty);
493 state
494 .instances
495 .get(&db_instance_identifier)
496 .map(|i| i.db_instance_arn.clone())
497 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
498 };
499
500 if let Some(runtime) = self.runtime.as_ref() {
501 runtime.stop_container(&db_instance_identifier).await;
502 }
503
504 let instance = {
505 let mut accounts = self.state.write();
506 let state = accounts.get_or_create(&request.account_id);
507 let inst = state
508 .instances
509 .get_mut(&db_instance_identifier)
510 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
511 inst.db_instance_status = "stopped".to_string();
512 inst.container_id = String::new();
513 inst.clone()
514 };
515
516 self.emit_event(
517 RdsSourceType::DbInstance,
518 &db_instance_identifier,
519 &arn,
520 "RDS-EVENT-0089",
521 &["notification"],
522 "DB instance stopped",
523 );
524
525 Ok(AwsResponse::xml(
526 StatusCode::OK,
527 query_response_xml(
528 "StopDBInstance",
529 RDS_NS,
530 &format!(
531 "<DBInstance>{}</DBInstance>",
532 db_instance_xml(&instance, Some("stopped"))
533 ),
534 &request.request_id,
535 ),
536 ))
537 }
538
539 async fn start_db_instance(
542 &self,
543 request: &AwsRequest,
544 ) -> Result<AwsResponse, AwsServiceError> {
545 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
546
547 let instance = {
551 let accounts = self.state.read();
552 let empty = RdsState::new(&request.account_id, &request.region);
553 let state = accounts.get(&request.account_id).unwrap_or(&empty);
554 state
555 .instances
556 .get(&db_instance_identifier)
557 .cloned()
558 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
559 };
560
561 {
564 let mut accounts = self.state.write();
565 let state = accounts.get_or_create(&request.account_id);
566 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
567 inst.db_instance_status = "starting".to_string();
568 }
569 }
570
571 let running = if let Some(runtime) = self.runtime.as_ref() {
572 match runtime
573 .ensure_postgres(
574 &db_instance_identifier,
575 &instance.engine,
576 &instance.engine_version,
577 &instance.master_username,
578 &instance.master_user_password,
579 instance
580 .db_name
581 .as_deref()
582 .unwrap_or(default_db_name(&instance.engine)),
583 &request.account_id,
584 &request.region,
585 )
586 .await
587 {
588 Ok(r) => Some(r),
589 Err(e) => {
590 let mut accounts = self.state.write();
594 let state = accounts.get_or_create(&request.account_id);
595 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
596 inst.db_instance_status = "stopped".to_string();
597 }
598 return Err(runtime_error_to_service_error(e));
599 }
600 }
601 } else {
602 {
607 let mut accounts = self.state.write();
608 let state = accounts.get_or_create(&request.account_id);
609 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
610 inst.db_instance_status = "stopped".to_string();
611 }
612 }
613 return Err(AwsServiceError::aws_error(
614 StatusCode::SERVICE_UNAVAILABLE,
615 "InternalFailure",
616 "Container runtime is not configured; cannot start DB instance",
617 ));
618 };
619
620 let instance = {
621 let mut accounts = self.state.write();
622 let state = accounts.get_or_create(&request.account_id);
623 let inst = state
624 .instances
625 .get_mut(&db_instance_identifier)
626 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
627 inst.db_instance_status = "available".to_string();
628 inst.endpoint_address = "127.0.0.1".to_string();
629 if let Some(r) = running {
630 inst.port = i32::from(r.host_port);
631 inst.host_port = r.host_port;
632 inst.container_id = r.container_id;
633 }
634 inst.clone()
635 };
636
637 self.emit_event(
638 RdsSourceType::DbInstance,
639 &db_instance_identifier,
640 &instance.db_instance_arn,
641 "RDS-EVENT-0088",
642 &["notification"],
643 "DB instance started",
644 );
645
646 Ok(AwsResponse::xml(
647 StatusCode::OK,
648 query_response_xml(
649 "StartDBInstance",
650 RDS_NS,
651 &format!(
652 "<DBInstance>{}</DBInstance>",
653 db_instance_xml(&instance, None)
654 ),
655 &request.request_id,
656 ),
657 ))
658 }
659}
660
661fn is_declared_add_tags_not_found(code: &str) -> bool {
669 matches!(
670 code,
671 "BlueGreenDeploymentNotFoundFault"
672 | "DBClusterNotFoundFault"
673 | "DBInstanceNotFound"
674 | "DBProxyEndpointNotFoundFault"
675 | "DBProxyNotFoundFault"
676 | "DBProxyTargetGroupNotFoundFault"
677 | "DBShardGroupNotFound"
678 | "DBSnapshotNotFound"
679 | "DBSnapshotTenantDatabaseNotFoundFault"
680 | "IntegrationNotFoundFault"
681 | "InvalidDBClusterEndpointStateFault"
682 | "InvalidDBClusterStateFault"
683 | "InvalidDBInstanceState"
684 | "TenantDatabaseNotFound"
685 )
686}
687
688async fn save_snapshot_static(
690 state: SharedRdsState,
691 store: Option<Arc<dyn SnapshotStore>>,
692 lock: Arc<AsyncMutex<()>>,
693) {
694 let Some(store) = store else {
695 return;
696 };
697 let _guard = lock.lock().await;
698 let snapshot = RdsSnapshot {
699 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
700 state: None,
701 accounts: Some(state.read().clone()),
702 };
703 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
704 let bytes = serde_json::to_vec(&snapshot)
705 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
706 store.save(&bytes)
707 })
708 .await;
709 match join {
710 Ok(Ok(())) => {}
711 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
712 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
713 }
714}
715
716impl RdsService {
717 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
728 self.runtime.as_ref().ok_or_else(|| {
729 AwsServiceError::aws_error(
730 StatusCode::SERVICE_UNAVAILABLE,
731 "InsufficientDBInstanceCapacity",
732 "Docker/Podman is required for RDS DB instances but is not available",
733 )
734 })
735 }
736}
737
738#[async_trait]
739impl AwsService for RdsService {
740 fn service_name(&self) -> &str {
741 "rds"
742 }
743
744 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
745 crate::validation::prevalidate(request.action.as_str(), &request)?;
750
751 let mutates = is_mutating_action(request.action.as_str());
752 let result = match request.action.as_str() {
753 "AddTagsToResource" => self.add_tags_to_resource(&request),
754 "CreateDBInstance" => self.create_db_instance(&request).await,
755 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
756 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
757 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
758 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
759 "DeleteDBInstance" => self.delete_db_instance(&request).await,
760 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
761 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
762 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
763 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
764 "DescribeDBInstances" => self.describe_db_instances(&request),
765 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
766 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
767 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
768 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
769 "DescribeOrderableDBInstanceOptions" => {
770 self.describe_orderable_db_instance_options(&request)
771 }
772 "ListTagsForResource" => self.list_tags_for_resource(&request),
773 "ModifyDBInstance" => self.modify_db_instance(&request),
774 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
775 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
776 "RebootDBInstance" => self.reboot_db_instance(&request).await,
777 "StartDBInstance" => self.start_db_instance(&request).await,
778 "StopDBInstance" => self.stop_db_instance(&request).await,
779 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
780 "RestoreDBInstanceFromDBSnapshot" => {
781 self.restore_db_instance_from_db_snapshot(&request).await
782 }
783 "RestoreDBInstanceToPointInTime" => {
784 self.restore_db_instance_to_point_in_time(&request).await
785 }
786 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
787 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
788 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
789 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
790 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
791 "RestoreDBClusterToPointInTime" => {
792 self.restore_db_cluster_to_point_in_time(&request).await
793 }
794 _ => self.handle_extra_action(&request),
795 };
796 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
797 self.save_snapshot().await;
798 }
799 result
800 }
801
802 fn supported_actions(&self) -> &[&str] {
803 SUPPORTED_ACTIONS
804 }
805}
806
807impl RdsService {}
808
809pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
813 format!(
814 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
815 xml_escape(name),
816 xml_escape(value),
817 )
818}
819
820pub(crate) fn render_engine_default_parameter_xml(
824 default: &crate::state::EngineDefaultParameter,
825) -> String {
826 format!(
827 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
828 xml_escape(default.name),
829 xml_escape(default.value),
830 xml_escape(default.apply_type),
831 xml_escape(default.data_type),
832 xml_escape(default.allowed_values),
833 default.is_modifiable,
834 )
835}
836
837pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
845 let mut out = Vec::new();
846 for prefix in ["Parameters.Parameter", "Parameters.member"] {
847 let mut index = 1;
848 loop {
849 let name_key = format!("{prefix}.{index}.ParameterName");
850 let value_key = format!("{prefix}.{index}.ParameterValue");
851 let name = optional_query_param(request, &name_key);
852 let value = optional_query_param(request, &value_key);
853 if name.is_none() && value.is_none() {
854 break;
855 }
856 if let (Some(n), Some(v)) = (name, value) {
857 if !n.is_empty() {
858 out.push((n, v));
859 }
860 }
861 index += 1;
862 }
863 }
864 out
865}
866
867fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
871 match (engine, log_file_name) {
872 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
873 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
874 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
875 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
876 "/var/log/mysql/slow.log".to_string()
877 }
878 _ => log_file_name.to_string(),
879 }
880}
881
882pub(crate) struct PaginationResult<T> {
883 items: Vec<T>,
884 next_marker: Option<String>,
885}
886
887fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
891 use serde_json::{json, Value};
892 let Some(map) = state.extras.get_mut("clusters") else {
893 return;
894 };
895 let Some(entry) = map.get_mut(cluster_id) else {
896 return;
897 };
898 let Some(obj) = entry.as_object_mut() else {
899 return;
900 };
901 let mut members: Vec<Value> = obj
902 .get("DBClusterMembers")
903 .and_then(|v| v.as_array())
904 .cloned()
905 .unwrap_or_default();
906 if members
907 .iter()
908 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
909 {
910 return;
911 }
912 let has_writer = members
913 .iter()
914 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
915 let promotion_tier = (members.len() as i64) + 1;
916 members.push(json!({
917 "DBInstanceIdentifier": instance_id,
918 "IsClusterWriter": !has_writer,
919 "DBClusterParameterGroupStatus": "in-sync",
920 "PromotionTier": promotion_tier,
921 }));
922 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
923 if !has_writer {
924 obj.insert(
925 "WriterDBInstanceIdentifier".to_string(),
926 Value::String(instance_id.to_string()),
927 );
928 }
929}
930
931#[path = "../service_helpers.rs"]
932mod service_helpers;
933pub(crate) use service_helpers::*;
934
935#[cfg(test)]
936#[path = "../service_tests.rs"]
937mod tests;