1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 }
365
366 let pending: Vec<Pending> = {
367 let mut accounts = self.state.write();
368 let mut out = Vec::new();
369 for (_, state) in accounts.iter_mut() {
370 let account_id = state.account_id.clone();
371 let region = state.region.clone();
372 for (id, inst) in state.instances.iter_mut() {
373 if !matches!(
374 inst.db_instance_status.as_str(),
375 "available" | "starting" | "modifying" | "rebooting" | "backing-up"
376 ) {
377 continue;
378 }
379 inst.db_instance_status = "starting".to_string();
380 out.push(Pending {
381 account_id: account_id.clone(),
382 region: region.clone(),
383 id: id.clone(),
384 arn: inst.db_instance_arn.clone(),
385 engine: inst.engine.clone(),
386 engine_version: inst.engine_version.clone(),
387 username: inst.master_username.clone(),
388 password: inst.master_user_password.clone(),
389 db_name: inst
390 .db_name
391 .clone()
392 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
393 });
394 }
395 }
396 out
397 };
398
399 if pending.is_empty() {
400 return;
401 }
402 tracing::info!(
403 count = pending.len(),
404 "recovering backing containers for persisted rds instances",
405 );
406
407 for p in pending {
408 let runtime = runtime.clone();
409 let state = self.state.clone();
410 let snapshot_store = self.snapshot_store.clone();
411 let snapshot_lock = self.snapshot_lock.clone();
412 let delivery_bus = self.delivery_bus.clone();
413 tokio::spawn(async move {
414 match runtime
415 .ensure_postgres(
416 &p.id,
417 &p.engine,
418 &p.engine_version,
419 &p.username,
420 &p.password,
421 &p.db_name,
422 &p.account_id,
423 &p.region,
424 )
425 .await
426 {
427 Ok(running) => {
428 {
429 let mut accounts = state.write();
430 if let Some(s) = accounts.get_mut(&p.account_id) {
431 if let Some(inst) = s.instances.get_mut(&p.id) {
432 inst.db_instance_status = "available".to_string();
433 inst.endpoint_address = running.endpoint_address.clone();
434 inst.port = i32::from(running.endpoint_port);
435 inst.host_port = running.host_port;
436 inst.container_id = running.container_id;
437 }
438 }
439 }
440 save_snapshot_static(
441 state.clone(),
442 snapshot_store.clone(),
443 snapshot_lock.clone(),
444 )
445 .await;
446 emit_event_static(
447 delivery_bus.as_ref(),
448 RdsSourceType::DbInstance,
449 &p.id,
450 &p.arn,
451 "RDS-EVENT-0088",
452 &["notification"],
453 "DB instance restarted after fakecloud restart",
454 );
455 }
456 Err(error) => {
457 tracing::error!(
458 %error,
459 db_instance_identifier = %p.id,
460 "failed to recover rds backing container after restart",
461 );
462 {
463 let mut accounts = state.write();
464 if let Some(s) = accounts.get_mut(&p.account_id) {
465 if let Some(inst) = s.instances.get_mut(&p.id) {
466 inst.db_instance_status = "failed".to_string();
467 }
468 }
469 }
470 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
471 }
472 }
473 });
474 }
475 }
476
477 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
483 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
484
485 let arn = {
490 let accounts = self.state.read();
491 let empty = RdsState::new(&request.account_id, &request.region);
492 let state = accounts.get(&request.account_id).unwrap_or(&empty);
493 state
494 .instances
495 .get(&db_instance_identifier)
496 .map(|i| i.db_instance_arn.clone())
497 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
498 };
499
500 if let Some(runtime) = self.runtime.as_ref() {
501 runtime.stop_container(&db_instance_identifier).await;
502 }
503
504 let instance = {
505 let mut accounts = self.state.write();
506 let state = accounts.get_or_create(&request.account_id);
507 let inst = state
508 .instances
509 .get_mut(&db_instance_identifier)
510 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
511 inst.db_instance_status = "stopped".to_string();
512 inst.container_id = String::new();
513 inst.clone()
514 };
515
516 self.emit_event(
517 RdsSourceType::DbInstance,
518 &db_instance_identifier,
519 &arn,
520 "RDS-EVENT-0089",
521 &["notification"],
522 "DB instance stopped",
523 );
524
525 Ok(AwsResponse::xml(
526 StatusCode::OK,
527 query_response_xml(
528 "StopDBInstance",
529 RDS_NS,
530 &format!(
531 "<DBInstance>{}</DBInstance>",
532 db_instance_xml(&instance, Some("stopped"))
533 ),
534 &request.request_id,
535 ),
536 ))
537 }
538
539 async fn start_db_instance(
542 &self,
543 request: &AwsRequest,
544 ) -> Result<AwsResponse, AwsServiceError> {
545 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
546
547 let instance = {
551 let accounts = self.state.read();
552 let empty = RdsState::new(&request.account_id, &request.region);
553 let state = accounts.get(&request.account_id).unwrap_or(&empty);
554 state
555 .instances
556 .get(&db_instance_identifier)
557 .cloned()
558 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
559 };
560
561 {
564 let mut accounts = self.state.write();
565 let state = accounts.get_or_create(&request.account_id);
566 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
567 inst.db_instance_status = "starting".to_string();
568 }
569 }
570
571 let running = if let Some(runtime) = self.runtime.as_ref() {
572 match runtime
573 .ensure_postgres(
574 &db_instance_identifier,
575 &instance.engine,
576 &instance.engine_version,
577 &instance.master_username,
578 &instance.master_user_password,
579 instance
580 .db_name
581 .as_deref()
582 .unwrap_or(default_db_name(&instance.engine)),
583 &request.account_id,
584 &request.region,
585 )
586 .await
587 {
588 Ok(r) => Some(r),
589 Err(e) => {
590 let mut accounts = self.state.write();
594 let state = accounts.get_or_create(&request.account_id);
595 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
596 inst.db_instance_status = "stopped".to_string();
597 }
598 return Err(runtime_error_to_service_error(e));
599 }
600 }
601 } else {
602 {
607 let mut accounts = self.state.write();
608 let state = accounts.get_or_create(&request.account_id);
609 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
610 inst.db_instance_status = "stopped".to_string();
611 }
612 }
613 return Err(AwsServiceError::aws_error(
614 StatusCode::SERVICE_UNAVAILABLE,
615 "InternalFailure",
616 "Container runtime is not configured; cannot start DB instance",
617 ));
618 };
619
620 let instance = {
621 let mut accounts = self.state.write();
622 let state = accounts.get_or_create(&request.account_id);
623 let inst = state
624 .instances
625 .get_mut(&db_instance_identifier)
626 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
627 inst.db_instance_status = "available".to_string();
628 inst.endpoint_address = "127.0.0.1".to_string();
629 if let Some(r) = running {
630 inst.endpoint_address = r.endpoint_address.clone();
631 inst.port = i32::from(r.endpoint_port);
632 inst.host_port = r.host_port;
633 inst.container_id = r.container_id;
634 }
635 inst.clone()
636 };
637
638 self.emit_event(
639 RdsSourceType::DbInstance,
640 &db_instance_identifier,
641 &instance.db_instance_arn,
642 "RDS-EVENT-0088",
643 &["notification"],
644 "DB instance started",
645 );
646
647 Ok(AwsResponse::xml(
648 StatusCode::OK,
649 query_response_xml(
650 "StartDBInstance",
651 RDS_NS,
652 &format!(
653 "<DBInstance>{}</DBInstance>",
654 db_instance_xml(&instance, None)
655 ),
656 &request.request_id,
657 ),
658 ))
659 }
660}
661
662fn is_declared_add_tags_not_found(code: &str) -> bool {
670 matches!(
671 code,
672 "BlueGreenDeploymentNotFoundFault"
673 | "DBClusterNotFoundFault"
674 | "DBInstanceNotFound"
675 | "DBProxyEndpointNotFoundFault"
676 | "DBProxyNotFoundFault"
677 | "DBProxyTargetGroupNotFoundFault"
678 | "DBShardGroupNotFound"
679 | "DBSnapshotNotFound"
680 | "DBSnapshotTenantDatabaseNotFoundFault"
681 | "IntegrationNotFoundFault"
682 | "InvalidDBClusterEndpointStateFault"
683 | "InvalidDBClusterStateFault"
684 | "InvalidDBInstanceState"
685 | "TenantDatabaseNotFound"
686 )
687}
688
689async fn save_snapshot_static(
691 state: SharedRdsState,
692 store: Option<Arc<dyn SnapshotStore>>,
693 lock: Arc<AsyncMutex<()>>,
694) {
695 let Some(store) = store else {
696 return;
697 };
698 let _guard = lock.lock().await;
699 let snapshot = RdsSnapshot {
700 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
701 state: None,
702 accounts: Some(state.read().clone()),
703 };
704 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
705 let bytes = serde_json::to_vec(&snapshot)
706 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
707 store.save(&bytes)
708 })
709 .await;
710 match join {
711 Ok(Ok(())) => {}
712 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
713 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
714 }
715}
716
717impl RdsService {
718 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
729 self.runtime.as_ref().ok_or_else(|| {
730 AwsServiceError::aws_error(
731 StatusCode::SERVICE_UNAVAILABLE,
732 "InsufficientDBInstanceCapacity",
733 "Docker/Podman is required for RDS DB instances but is not available",
734 )
735 })
736 }
737}
738
739#[async_trait]
740impl AwsService for RdsService {
741 fn service_name(&self) -> &str {
742 "rds"
743 }
744
745 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
746 crate::validation::prevalidate(request.action.as_str(), &request)?;
751
752 let mutates = is_mutating_action(request.action.as_str());
753 let result = match request.action.as_str() {
754 "AddTagsToResource" => self.add_tags_to_resource(&request),
755 "CreateDBInstance" => self.create_db_instance(&request).await,
756 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
757 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
758 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
759 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
760 "DeleteDBInstance" => self.delete_db_instance(&request).await,
761 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
762 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
763 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
764 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
765 "DescribeDBInstances" => self.describe_db_instances(&request),
766 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
767 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
768 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
769 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
770 "DescribeOrderableDBInstanceOptions" => {
771 self.describe_orderable_db_instance_options(&request)
772 }
773 "ListTagsForResource" => self.list_tags_for_resource(&request),
774 "ModifyDBInstance" => self.modify_db_instance(&request),
775 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
776 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
777 "RebootDBInstance" => self.reboot_db_instance(&request).await,
778 "StartDBInstance" => self.start_db_instance(&request).await,
779 "StopDBInstance" => self.stop_db_instance(&request).await,
780 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
781 "RestoreDBInstanceFromDBSnapshot" => {
782 self.restore_db_instance_from_db_snapshot(&request).await
783 }
784 "RestoreDBInstanceToPointInTime" => {
785 self.restore_db_instance_to_point_in_time(&request).await
786 }
787 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
788 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
789 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
790 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
791 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
792 "RestoreDBClusterToPointInTime" => {
793 self.restore_db_cluster_to_point_in_time(&request).await
794 }
795 _ => self.handle_extra_action(&request),
796 };
797 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
798 self.save_snapshot().await;
799 }
800 result
801 }
802
803 fn supported_actions(&self) -> &[&str] {
804 SUPPORTED_ACTIONS
805 }
806}
807
808impl RdsService {}
809
810pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
814 format!(
815 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
816 xml_escape(name),
817 xml_escape(value),
818 )
819}
820
821pub(crate) fn render_engine_default_parameter_xml(
825 default: &crate::state::EngineDefaultParameter,
826) -> String {
827 format!(
828 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
829 xml_escape(default.name),
830 xml_escape(default.value),
831 xml_escape(default.apply_type),
832 xml_escape(default.data_type),
833 xml_escape(default.allowed_values),
834 default.is_modifiable,
835 )
836}
837
838pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
846 let mut out = Vec::new();
847 for prefix in ["Parameters.Parameter", "Parameters.member"] {
848 let mut index = 1;
849 loop {
850 let name_key = format!("{prefix}.{index}.ParameterName");
851 let value_key = format!("{prefix}.{index}.ParameterValue");
852 let name = optional_query_param(request, &name_key);
853 let value = optional_query_param(request, &value_key);
854 if name.is_none() && value.is_none() {
855 break;
856 }
857 if let (Some(n), Some(v)) = (name, value) {
858 if !n.is_empty() {
859 out.push((n, v));
860 }
861 }
862 index += 1;
863 }
864 }
865 out
866}
867
868fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
872 match (engine, log_file_name) {
873 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
874 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
875 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
876 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
877 "/var/log/mysql/slow.log".to_string()
878 }
879 _ => log_file_name.to_string(),
880 }
881}
882
883pub(crate) struct PaginationResult<T> {
884 items: Vec<T>,
885 next_marker: Option<String>,
886}
887
888fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
892 use serde_json::{json, Value};
893 let Some(map) = state.extras.get_mut("clusters") else {
894 return;
895 };
896 let Some(entry) = map.get_mut(cluster_id) else {
897 return;
898 };
899 let Some(obj) = entry.as_object_mut() else {
900 return;
901 };
902 let mut members: Vec<Value> = obj
903 .get("DBClusterMembers")
904 .and_then(|v| v.as_array())
905 .cloned()
906 .unwrap_or_default();
907 if members
908 .iter()
909 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
910 {
911 return;
912 }
913 let has_writer = members
914 .iter()
915 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
916 let promotion_tier = (members.len() as i64) + 1;
917 members.push(json!({
918 "DBInstanceIdentifier": instance_id,
919 "IsClusterWriter": !has_writer,
920 "DBClusterParameterGroupStatus": "in-sync",
921 "PromotionTier": promotion_tier,
922 }));
923 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
924 if !has_writer {
925 obj.insert(
926 "WriterDBInstanceIdentifier".to_string(),
927 Value::String(instance_id.to_string()),
928 );
929 }
930}
931
932#[path = "../service_helpers.rs"]
933mod service_helpers;
934pub(crate) use service_helpers::*;
935
936#[cfg(test)]
937#[path = "../service_tests.rs"]
938mod tests;