1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 tags: Vec<crate::state::RdsTag>,
365 }
366
367 let pending: Vec<Pending> = {
368 let mut accounts = self.state.write();
369 let mut out = Vec::new();
370 for (_, state) in accounts.iter_mut() {
371 let account_id = state.account_id.clone();
372 let region = state.region.clone();
373 for (id, inst) in state.instances.iter_mut() {
374 if !matches!(
382 inst.db_instance_status.as_str(),
383 "creating"
384 | "available"
385 | "starting"
386 | "modifying"
387 | "rebooting"
388 | "backing-up"
389 ) {
390 continue;
391 }
392 inst.db_instance_status = "starting".to_string();
393 out.push(Pending {
394 account_id: account_id.clone(),
395 region: region.clone(),
396 id: id.clone(),
397 arn: inst.db_instance_arn.clone(),
398 engine: inst.engine.clone(),
399 engine_version: inst.engine_version.clone(),
400 username: inst.master_username.clone(),
401 password: inst.master_user_password.clone(),
402 db_name: inst
403 .db_name
404 .clone()
405 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406 tags: inst.tags.clone(),
407 });
408 }
409 }
410 out
411 };
412
413 if pending.is_empty() {
414 return;
415 }
416 tracing::info!(
417 count = pending.len(),
418 "recovering backing containers for persisted rds instances",
419 );
420
421 for p in pending {
422 let runtime = runtime.clone();
423 let state = self.state.clone();
424 let snapshot_store = self.snapshot_store.clone();
425 let snapshot_lock = self.snapshot_lock.clone();
426 let delivery_bus = self.delivery_bus.clone();
427 tokio::spawn(async move {
428 match runtime
429 .ensure_postgres(
430 &p.id,
431 &p.engine,
432 &p.engine_version,
433 &p.username,
434 &p.password,
435 &p.db_name,
436 &p.account_id,
437 &p.region,
438 &p.tags,
439 )
440 .await
441 {
442 Ok(running) => {
443 {
444 let mut accounts = state.write();
445 if let Some(s) = accounts.get_mut(&p.account_id) {
446 if let Some(inst) = s.instances.get_mut(&p.id) {
447 inst.db_instance_status = "available".to_string();
448 inst.endpoint_address = running.endpoint_address.clone();
449 inst.port = i32::from(running.endpoint_port);
450 inst.host_port = running.host_port;
451 inst.container_id = running.container_id;
452 }
453 }
454 }
455 save_snapshot_static(
456 state.clone(),
457 snapshot_store.clone(),
458 snapshot_lock.clone(),
459 )
460 .await;
461 emit_event_static(
462 delivery_bus.as_ref(),
463 RdsSourceType::DbInstance,
464 &p.id,
465 &p.arn,
466 "RDS-EVENT-0088",
467 &["notification"],
468 "DB instance restarted after fakecloud restart",
469 );
470 }
471 Err(error) => {
472 tracing::error!(
473 %error,
474 db_instance_identifier = %p.id,
475 "failed to recover rds backing container after restart",
476 );
477 {
478 let mut accounts = state.write();
479 if let Some(s) = accounts.get_mut(&p.account_id) {
480 if let Some(inst) = s.instances.get_mut(&p.id) {
481 inst.db_instance_status = "failed".to_string();
482 }
483 }
484 }
485 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486 }
487 }
488 });
489 }
490 }
491
492 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500 let arn = {
505 let accounts = self.state.read();
506 let empty = RdsState::new(&request.account_id, &request.region);
507 let state = accounts.get(&request.account_id).unwrap_or(&empty);
508 state
509 .instances
510 .get(&db_instance_identifier)
511 .map(|i| i.db_instance_arn.clone())
512 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513 };
514
515 if let Some(runtime) = self.runtime.as_ref() {
516 runtime.stop_container(&db_instance_identifier).await;
517 }
518
519 let instance = {
520 let mut accounts = self.state.write();
521 let state = accounts.get_or_create(&request.account_id);
522 let inst = state
523 .instances
524 .get_mut(&db_instance_identifier)
525 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526 inst.db_instance_status = "stopped".to_string();
527 inst.container_id = String::new();
528 inst.clone()
529 };
530
531 self.emit_event(
532 RdsSourceType::DbInstance,
533 &db_instance_identifier,
534 &arn,
535 "RDS-EVENT-0089",
536 &["notification"],
537 "DB instance stopped",
538 );
539
540 Ok(AwsResponse::xml(
541 StatusCode::OK,
542 query_response_xml(
543 "StopDBInstance",
544 RDS_NS,
545 &format!(
546 "<DBInstance>{}</DBInstance>",
547 db_instance_xml(&instance, Some("stopped"))
548 ),
549 &request.request_id,
550 ),
551 ))
552 }
553
554 async fn start_db_instance(
557 &self,
558 request: &AwsRequest,
559 ) -> Result<AwsResponse, AwsServiceError> {
560 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562 let instance = {
566 let accounts = self.state.read();
567 let empty = RdsState::new(&request.account_id, &request.region);
568 let state = accounts.get(&request.account_id).unwrap_or(&empty);
569 state
570 .instances
571 .get(&db_instance_identifier)
572 .cloned()
573 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574 };
575
576 {
579 let mut accounts = self.state.write();
580 let state = accounts.get_or_create(&request.account_id);
581 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582 inst.db_instance_status = "starting".to_string();
583 }
584 }
585
586 let running = if let Some(runtime) = self.runtime.as_ref() {
587 match runtime
588 .ensure_postgres(
589 &db_instance_identifier,
590 &instance.engine,
591 &instance.engine_version,
592 &instance.master_username,
593 &instance.master_user_password,
594 instance
595 .db_name
596 .as_deref()
597 .unwrap_or(default_db_name(&instance.engine)),
598 &request.account_id,
599 &request.region,
600 &instance.tags,
601 )
602 .await
603 {
604 Ok(r) => Some(r),
605 Err(e) => {
606 let mut accounts = self.state.write();
610 let state = accounts.get_or_create(&request.account_id);
611 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
612 inst.db_instance_status = "stopped".to_string();
613 }
614 return Err(runtime_error_to_service_error(e));
615 }
616 }
617 } else {
618 {
623 let mut accounts = self.state.write();
624 let state = accounts.get_or_create(&request.account_id);
625 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
626 inst.db_instance_status = "stopped".to_string();
627 }
628 }
629 return Err(AwsServiceError::aws_error(
630 StatusCode::SERVICE_UNAVAILABLE,
631 "InternalFailure",
632 "Container runtime is not configured; cannot start DB instance",
633 ));
634 };
635
636 let instance = {
637 let mut accounts = self.state.write();
638 let state = accounts.get_or_create(&request.account_id);
639 let inst = state
640 .instances
641 .get_mut(&db_instance_identifier)
642 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
643 inst.db_instance_status = "available".to_string();
644 inst.endpoint_address = "127.0.0.1".to_string();
645 if let Some(r) = running {
646 inst.endpoint_address = r.endpoint_address.clone();
647 inst.port = i32::from(r.endpoint_port);
648 inst.host_port = r.host_port;
649 inst.container_id = r.container_id;
650 }
651 inst.clone()
652 };
653
654 self.emit_event(
655 RdsSourceType::DbInstance,
656 &db_instance_identifier,
657 &instance.db_instance_arn,
658 "RDS-EVENT-0088",
659 &["notification"],
660 "DB instance started",
661 );
662
663 Ok(AwsResponse::xml(
664 StatusCode::OK,
665 query_response_xml(
666 "StartDBInstance",
667 RDS_NS,
668 &format!(
669 "<DBInstance>{}</DBInstance>",
670 db_instance_xml(&instance, None)
671 ),
672 &request.request_id,
673 ),
674 ))
675 }
676}
677
678fn is_declared_add_tags_not_found(code: &str) -> bool {
686 matches!(
687 code,
688 "BlueGreenDeploymentNotFoundFault"
689 | "DBClusterNotFoundFault"
690 | "DBInstanceNotFound"
691 | "DBProxyEndpointNotFoundFault"
692 | "DBProxyNotFoundFault"
693 | "DBProxyTargetGroupNotFoundFault"
694 | "DBShardGroupNotFound"
695 | "DBSnapshotNotFound"
696 | "DBSnapshotTenantDatabaseNotFoundFault"
697 | "IntegrationNotFoundFault"
698 | "InvalidDBClusterEndpointStateFault"
699 | "InvalidDBClusterStateFault"
700 | "InvalidDBInstanceState"
701 | "TenantDatabaseNotFound"
702 )
703}
704
705async fn save_snapshot_static(
707 state: SharedRdsState,
708 store: Option<Arc<dyn SnapshotStore>>,
709 lock: Arc<AsyncMutex<()>>,
710) {
711 let Some(store) = store else {
712 return;
713 };
714 let _guard = lock.lock().await;
715 let snapshot = RdsSnapshot {
716 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
717 state: None,
718 accounts: Some(state.read().clone()),
719 };
720 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
721 let bytes = serde_json::to_vec(&snapshot)
722 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
723 store.save(&bytes)
724 })
725 .await;
726 match join {
727 Ok(Ok(())) => {}
728 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
729 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
730 }
731}
732
733impl RdsService {
734 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
745 self.runtime.as_ref().ok_or_else(|| {
746 AwsServiceError::aws_error(
747 StatusCode::SERVICE_UNAVAILABLE,
748 "InsufficientDBInstanceCapacity",
749 "Docker/Podman is required for RDS DB instances but is not available",
750 )
751 })
752 }
753
754 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
758 let store = self.snapshot_store.clone()?;
759 let state = self.state.clone();
760 let lock = self.snapshot_lock.clone();
761 Some(Arc::new(move || {
762 let state = state.clone();
763 let store = store.clone();
764 let lock = lock.clone();
765 Box::pin(async move {
766 save_snapshot_static(state, Some(store), lock).await;
767 })
768 }))
769 }
770}
771
772#[async_trait]
773impl AwsService for RdsService {
774 fn service_name(&self) -> &str {
775 "rds"
776 }
777
778 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
779 crate::validation::prevalidate(request.action.as_str(), &request)?;
784
785 let mutates = is_mutating_action(request.action.as_str());
786 let result = match request.action.as_str() {
787 "AddTagsToResource" => self.add_tags_to_resource(&request),
788 "CreateDBInstance" => self.create_db_instance(&request).await,
789 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
790 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
791 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
792 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
793 "DeleteDBInstance" => self.delete_db_instance(&request).await,
794 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
795 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
796 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
797 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
798 "DescribeDBInstances" => self.describe_db_instances(&request),
799 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
800 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
801 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
802 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
803 "DescribeOrderableDBInstanceOptions" => {
804 self.describe_orderable_db_instance_options(&request)
805 }
806 "ListTagsForResource" => self.list_tags_for_resource(&request),
807 "ModifyDBInstance" => self.modify_db_instance(&request),
808 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
809 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
810 "RebootDBInstance" => self.reboot_db_instance(&request).await,
811 "StartDBInstance" => self.start_db_instance(&request).await,
812 "StopDBInstance" => self.stop_db_instance(&request).await,
813 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
814 "RestoreDBInstanceFromDBSnapshot" => {
815 self.restore_db_instance_from_db_snapshot(&request).await
816 }
817 "RestoreDBInstanceToPointInTime" => {
818 self.restore_db_instance_to_point_in_time(&request).await
819 }
820 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
821 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
822 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
823 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
824 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
825 "RestoreDBClusterToPointInTime" => {
826 self.restore_db_cluster_to_point_in_time(&request).await
827 }
828 _ => self.handle_extra_action(&request),
829 };
830 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
831 self.save_snapshot().await;
832 }
833 result
834 }
835
836 fn supported_actions(&self) -> &[&str] {
837 SUPPORTED_ACTIONS
838 }
839}
840
841impl RdsService {}
842
843pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
847 format!(
848 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
849 xml_escape(name),
850 xml_escape(value),
851 )
852}
853
854pub(crate) fn render_engine_default_parameter_xml(
858 default: &crate::state::EngineDefaultParameter,
859) -> String {
860 format!(
861 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
862 xml_escape(default.name),
863 xml_escape(default.value),
864 xml_escape(default.apply_type),
865 xml_escape(default.data_type),
866 xml_escape(default.allowed_values),
867 default.is_modifiable,
868 )
869}
870
871pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
879 let mut out = Vec::new();
880 for prefix in ["Parameters.Parameter", "Parameters.member"] {
881 let mut index = 1;
882 loop {
883 let name_key = format!("{prefix}.{index}.ParameterName");
884 let value_key = format!("{prefix}.{index}.ParameterValue");
885 let name = optional_query_param(request, &name_key);
886 let value = optional_query_param(request, &value_key);
887 if name.is_none() && value.is_none() {
888 break;
889 }
890 if let (Some(n), Some(v)) = (name, value) {
891 if !n.is_empty() {
892 out.push((n, v));
893 }
894 }
895 index += 1;
896 }
897 }
898 out
899}
900
901fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
905 match (engine, log_file_name) {
906 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
907 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
908 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
909 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
910 "/var/log/mysql/slow.log".to_string()
911 }
912 _ => log_file_name.to_string(),
913 }
914}
915
916pub(crate) struct PaginationResult<T> {
917 items: Vec<T>,
918 next_marker: Option<String>,
919}
920
921fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
925 use serde_json::{json, Value};
926 let Some(map) = state.extras.get_mut("clusters") else {
927 return;
928 };
929 let Some(entry) = map.get_mut(cluster_id) else {
930 return;
931 };
932 let Some(obj) = entry.as_object_mut() else {
933 return;
934 };
935 let mut members: Vec<Value> = obj
936 .get("DBClusterMembers")
937 .and_then(|v| v.as_array())
938 .cloned()
939 .unwrap_or_default();
940 if members
941 .iter()
942 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
943 {
944 return;
945 }
946 let has_writer = members
947 .iter()
948 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
949 let promotion_tier = (members.len() as i64) + 1;
950 members.push(json!({
951 "DBInstanceIdentifier": instance_id,
952 "IsClusterWriter": !has_writer,
953 "DBClusterParameterGroupStatus": "in-sync",
954 "PromotionTier": promotion_tier,
955 }));
956 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
957 if !has_writer {
958 obj.insert(
959 "WriterDBInstanceIdentifier".to_string(),
960 Value::String(instance_id.to_string()),
961 );
962 }
963}
964
965#[path = "../service_helpers.rs"]
966mod service_helpers;
967pub(crate) use service_helpers::*;
968
969#[cfg(test)]
970#[path = "../service_tests.rs"]
971mod tests;