1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 tags: Vec<crate::state::RdsTag>,
365 }
366
367 let pending: Vec<Pending> = {
368 let mut accounts = self.state.write();
369 let mut out = Vec::new();
370 for (_, state) in accounts.iter_mut() {
371 let account_id = state.account_id.clone();
372 let region = state.region.clone();
373 for (id, inst) in state.instances.iter_mut() {
374 if !matches!(
382 inst.db_instance_status.as_str(),
383 "creating"
384 | "available"
385 | "starting"
386 | "modifying"
387 | "rebooting"
388 | "backing-up"
389 ) {
390 continue;
391 }
392 inst.db_instance_status = "starting".to_string();
393 out.push(Pending {
394 account_id: account_id.clone(),
395 region: region.clone(),
396 id: id.clone(),
397 arn: inst.db_instance_arn.clone(),
398 engine: inst.engine.clone(),
399 engine_version: inst.engine_version.clone(),
400 username: inst.master_username.clone(),
401 password: inst.master_user_password.clone(),
402 db_name: inst
403 .db_name
404 .clone()
405 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
406 tags: inst.tags.clone(),
407 });
408 }
409 }
410 out
411 };
412
413 if pending.is_empty() {
414 return;
415 }
416 tracing::info!(
417 count = pending.len(),
418 "recovering backing containers for persisted rds instances",
419 );
420
421 for p in pending {
422 let runtime = runtime.clone();
423 let state = self.state.clone();
424 let snapshot_store = self.snapshot_store.clone();
425 let snapshot_lock = self.snapshot_lock.clone();
426 let delivery_bus = self.delivery_bus.clone();
427 tokio::spawn(async move {
428 match runtime
429 .ensure_postgres(
430 &p.id,
431 &p.engine,
432 &p.engine_version,
433 &p.username,
434 &p.password,
435 &p.db_name,
436 &p.account_id,
437 &p.region,
438 &p.tags,
439 )
440 .await
441 {
442 Ok(running) => {
443 {
444 let mut accounts = state.write();
445 if let Some(s) = accounts.get_mut(&p.account_id) {
446 if let Some(inst) = s.instances.get_mut(&p.id) {
447 inst.db_instance_status = "available".to_string();
448 inst.endpoint_address = running.endpoint_address.clone();
449 inst.port = i32::from(running.endpoint_port);
450 inst.host_port = running.host_port;
451 inst.container_id = running.container_id;
452 }
453 }
454 }
455 save_snapshot_static(
456 state.clone(),
457 snapshot_store.clone(),
458 snapshot_lock.clone(),
459 )
460 .await;
461 emit_event_static(
462 delivery_bus.as_ref(),
463 RdsSourceType::DbInstance,
464 &p.id,
465 &p.arn,
466 "RDS-EVENT-0088",
467 &["notification"],
468 "DB instance restarted after fakecloud restart",
469 );
470 }
471 Err(error) => {
472 tracing::error!(
473 %error,
474 db_instance_identifier = %p.id,
475 "failed to recover rds backing container after restart",
476 );
477 {
478 let mut accounts = state.write();
479 if let Some(s) = accounts.get_mut(&p.account_id) {
480 if let Some(inst) = s.instances.get_mut(&p.id) {
481 inst.db_instance_status = "failed".to_string();
482 }
483 }
484 }
485 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
486 }
487 }
488 });
489 }
490 }
491
492 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
498 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
499
500 let arn = {
505 let accounts = self.state.read();
506 let empty = RdsState::new(&request.account_id, &request.region);
507 let state = accounts.get(&request.account_id).unwrap_or(&empty);
508 state
509 .instances
510 .get(&db_instance_identifier)
511 .map(|i| i.db_instance_arn.clone())
512 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
513 };
514
515 if let Some(runtime) = self.runtime.as_ref() {
516 runtime.stop_container(&db_instance_identifier).await;
517 }
518
519 let instance = {
520 let mut accounts = self.state.write();
521 let state = accounts.get_or_create(&request.account_id);
522 let inst = state
523 .instances
524 .get_mut(&db_instance_identifier)
525 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
526 inst.db_instance_status = "stopped".to_string();
527 inst.container_id = String::new();
528 inst.clone()
529 };
530
531 self.emit_event(
532 RdsSourceType::DbInstance,
533 &db_instance_identifier,
534 &arn,
535 "RDS-EVENT-0089",
536 &["notification"],
537 "DB instance stopped",
538 );
539
540 Ok(AwsResponse::xml(
541 StatusCode::OK,
542 query_response_xml(
543 "StopDBInstance",
544 RDS_NS,
545 &format!(
546 "<DBInstance>{}</DBInstance>",
547 db_instance_xml(&instance, Some("stopped"))
548 ),
549 &request.request_id,
550 ),
551 ))
552 }
553
554 async fn start_db_instance(
557 &self,
558 request: &AwsRequest,
559 ) -> Result<AwsResponse, AwsServiceError> {
560 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
561
562 let instance = {
566 let accounts = self.state.read();
567 let empty = RdsState::new(&request.account_id, &request.region);
568 let state = accounts.get(&request.account_id).unwrap_or(&empty);
569 state
570 .instances
571 .get(&db_instance_identifier)
572 .cloned()
573 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
574 };
575
576 {
579 let mut accounts = self.state.write();
580 let state = accounts.get_or_create(&request.account_id);
581 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
582 inst.db_instance_status = "starting".to_string();
583 }
584 }
585
586 let Some(runtime) = self.runtime.clone() else {
591 {
592 let mut accounts = self.state.write();
593 let state = accounts.get_or_create(&request.account_id);
594 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
595 inst.db_instance_status = "stopped".to_string();
596 }
597 }
598 return Err(AwsServiceError::aws_error(
599 StatusCode::SERVICE_UNAVAILABLE,
600 "InternalFailure",
601 "Container runtime is not configured; cannot start DB instance",
602 ));
603 };
604
605 {
612 let state_handle = self.state.clone();
613 let delivery_bus = self.delivery_bus.clone();
614 let snapshot_store = self.snapshot_store.clone();
615 let snapshot_lock = self.snapshot_lock.clone();
616 let id = db_instance_identifier.clone();
617 let account_id = request.account_id.clone();
618 let region = request.region.clone();
619 let inst = instance.clone();
620 tokio::spawn(async move {
621 let logical_db = inst
622 .db_name
623 .clone()
624 .unwrap_or_else(|| default_db_name(&inst.engine).to_string());
625 match runtime
626 .ensure_postgres(
627 &id,
628 &inst.engine,
629 &inst.engine_version,
630 &inst.master_username,
631 &inst.master_user_password,
632 &logical_db,
633 &account_id,
634 ®ion,
635 &inst.tags,
636 )
637 .await
638 {
639 Ok(r) => {
640 let arn = {
641 let mut accounts = state_handle.write();
642 let state = accounts.get_or_create(&account_id);
643 let Some(inst) = state.instances.get_mut(&id) else {
644 return;
645 };
646 inst.db_instance_status = "available".to_string();
647 inst.endpoint_address = r.endpoint_address.clone();
648 inst.port = i32::from(r.endpoint_port);
649 inst.host_port = r.host_port;
650 inst.container_id = r.container_id;
651 inst.db_instance_arn.clone()
652 };
653 emit_event_static_with_state(
654 delivery_bus.as_ref(),
655 Some(&state_handle),
656 Some(&account_id),
657 RdsSourceType::DbInstance,
658 &id,
659 &arn,
660 "RDS-EVENT-0088",
661 &["notification"],
662 "DB instance started",
663 );
664 save_snapshot_static(state_handle.clone(), snapshot_store, snapshot_lock)
665 .await;
666 }
667 Err(_) => {
668 let mut accounts = state_handle.write();
671 let state = accounts.get_or_create(&account_id);
672 if let Some(inst) = state.instances.get_mut(&id) {
673 inst.db_instance_status = "stopped".to_string();
674 }
675 }
676 }
677 });
678 }
679
680 Ok(AwsResponse::xml(
681 StatusCode::OK,
682 query_response_xml(
683 "StartDBInstance",
684 RDS_NS,
685 &format!(
686 "<DBInstance>{}</DBInstance>",
687 db_instance_xml(&instance, Some("starting"))
688 ),
689 &request.request_id,
690 ),
691 ))
692 }
693}
694
695fn is_declared_add_tags_not_found(code: &str) -> bool {
703 matches!(
704 code,
705 "BlueGreenDeploymentNotFoundFault"
706 | "DBClusterNotFoundFault"
707 | "DBInstanceNotFound"
708 | "DBProxyEndpointNotFoundFault"
709 | "DBProxyNotFoundFault"
710 | "DBProxyTargetGroupNotFoundFault"
711 | "DBShardGroupNotFound"
712 | "DBSnapshotNotFound"
713 | "DBSnapshotTenantDatabaseNotFoundFault"
714 | "IntegrationNotFoundFault"
715 | "InvalidDBClusterEndpointStateFault"
716 | "InvalidDBClusterStateFault"
717 | "InvalidDBInstanceState"
718 | "TenantDatabaseNotFound"
719 )
720}
721
722async fn save_snapshot_static(
724 state: SharedRdsState,
725 store: Option<Arc<dyn SnapshotStore>>,
726 lock: Arc<AsyncMutex<()>>,
727) {
728 let Some(store) = store else {
729 return;
730 };
731 let _guard = lock.lock().await;
732 let snapshot = RdsSnapshot {
733 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
734 state: None,
735 accounts: Some(state.read().clone()),
736 };
737 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
738 let bytes = serde_json::to_vec(&snapshot)
739 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
740 store.save(&bytes)
741 })
742 .await;
743 match join {
744 Ok(Ok(())) => {}
745 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
746 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
747 }
748}
749
750impl RdsService {
751 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
762 self.runtime.as_ref().ok_or_else(|| {
763 AwsServiceError::aws_error(
764 StatusCode::SERVICE_UNAVAILABLE,
765 "InsufficientDBInstanceCapacity",
766 "Docker/Podman is required for RDS DB instances but is not available",
767 )
768 })
769 }
770
771 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
775 let store = self.snapshot_store.clone()?;
776 let state = self.state.clone();
777 let lock = self.snapshot_lock.clone();
778 Some(Arc::new(move || {
779 let state = state.clone();
780 let store = store.clone();
781 let lock = lock.clone();
782 Box::pin(async move {
783 save_snapshot_static(state, Some(store), lock).await;
784 })
785 }))
786 }
787}
788
789#[async_trait]
790impl AwsService for RdsService {
791 fn service_name(&self) -> &str {
792 "rds"
793 }
794
795 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
796 crate::validation::prevalidate(request.action.as_str(), &request)?;
801
802 let mutates = is_mutating_action(request.action.as_str());
803 let result = match request.action.as_str() {
804 "AddTagsToResource" => self.add_tags_to_resource(&request),
805 "CreateDBInstance" => self.create_db_instance(&request).await,
806 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
807 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
808 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
809 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
810 "DeleteDBInstance" => self.delete_db_instance(&request).await,
811 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
812 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
813 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
814 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
815 "DescribeDBInstances" => self.describe_db_instances(&request),
816 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
817 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
818 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
819 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
820 "DescribeOrderableDBInstanceOptions" => {
821 self.describe_orderable_db_instance_options(&request)
822 }
823 "ListTagsForResource" => self.list_tags_for_resource(&request),
824 "ModifyDBInstance" => self.modify_db_instance(&request),
825 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
826 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
827 "RebootDBInstance" => self.reboot_db_instance(&request).await,
828 "StartDBInstance" => self.start_db_instance(&request).await,
829 "StopDBInstance" => self.stop_db_instance(&request).await,
830 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
831 "RestoreDBInstanceFromDBSnapshot" => {
832 self.restore_db_instance_from_db_snapshot(&request).await
833 }
834 "RestoreDBInstanceToPointInTime" => {
835 self.restore_db_instance_to_point_in_time(&request).await
836 }
837 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
838 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
839 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
840 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
841 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
842 "RestoreDBClusterToPointInTime" => {
843 self.restore_db_cluster_to_point_in_time(&request).await
844 }
845 _ => self.handle_extra_action(&request),
846 };
847 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
848 self.save_snapshot().await;
849 }
850 result
851 }
852
853 fn supported_actions(&self) -> &[&str] {
854 SUPPORTED_ACTIONS
855 }
856}
857
858impl RdsService {}
859
860pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
864 format!(
865 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
866 xml_escape(name),
867 xml_escape(value),
868 )
869}
870
871pub(crate) fn render_engine_default_parameter_xml(
875 default: &crate::state::EngineDefaultParameter,
876) -> String {
877 format!(
878 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
879 xml_escape(default.name),
880 xml_escape(default.value),
881 xml_escape(default.apply_type),
882 xml_escape(default.data_type),
883 xml_escape(default.allowed_values),
884 default.is_modifiable,
885 )
886}
887
888pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
896 let mut out = Vec::new();
897 for prefix in ["Parameters.Parameter", "Parameters.member"] {
898 let mut index = 1;
899 loop {
900 let name_key = format!("{prefix}.{index}.ParameterName");
901 let value_key = format!("{prefix}.{index}.ParameterValue");
902 let name = optional_query_param(request, &name_key);
903 let value = optional_query_param(request, &value_key);
904 if name.is_none() && value.is_none() {
905 break;
906 }
907 if let (Some(n), Some(v)) = (name, value) {
908 if !n.is_empty() {
909 out.push((n, v));
910 }
911 }
912 index += 1;
913 }
914 }
915 out
916}
917
918fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
922 match (engine, log_file_name) {
923 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
924 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
925 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
926 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
927 "/var/log/mysql/slow.log".to_string()
928 }
929 _ => log_file_name.to_string(),
930 }
931}
932
933pub(crate) struct PaginationResult<T> {
934 items: Vec<T>,
935 next_marker: Option<String>,
936}
937
938fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
942 use serde_json::{json, Value};
943 let Some(map) = state.extras.get_mut("clusters") else {
944 return;
945 };
946 let Some(entry) = map.get_mut(cluster_id) else {
947 return;
948 };
949 let Some(obj) = entry.as_object_mut() else {
950 return;
951 };
952 let mut members: Vec<Value> = obj
953 .get("DBClusterMembers")
954 .and_then(|v| v.as_array())
955 .cloned()
956 .unwrap_or_default();
957 if members
958 .iter()
959 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
960 {
961 return;
962 }
963 let has_writer = members
964 .iter()
965 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
966 let promotion_tier = (members.len() as i64) + 1;
967 members.push(json!({
968 "DBInstanceIdentifier": instance_id,
969 "IsClusterWriter": !has_writer,
970 "DBClusterParameterGroupStatus": "in-sync",
971 "PromotionTier": promotion_tier,
972 }));
973 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
974 if !has_writer {
975 obj.insert(
976 "WriterDBInstanceIdentifier".to_string(),
977 Value::String(instance_id.to_string()),
978 );
979 }
980}
981
982#[path = "../service_helpers.rs"]
983mod service_helpers;
984pub(crate) use service_helpers::*;
985
986#[cfg(test)]
987#[path = "../service_tests.rs"]
988mod tests;