1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::query::{optional_query_param, query_response_xml, required_query_param};
13use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
14use fakecloud_persistence::SnapshotStore;
15
16use crate::runtime::{RdsRuntime, RuntimeError};
17use crate::state::{
18 default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
19 DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
20 SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25const SUPPORTED_ACTIONS: &[&str] = &[
26 "AddRoleToDBCluster",
27 "AddRoleToDBInstance",
28 "AddSourceIdentifierToSubscription",
29 "AddTagsToResource",
30 "ApplyPendingMaintenanceAction",
31 "AuthorizeDBSecurityGroupIngress",
32 "BacktrackDBCluster",
33 "CancelExportTask",
34 "CopyDBClusterParameterGroup",
35 "CopyDBClusterSnapshot",
36 "CopyDBParameterGroup",
37 "CopyDBSnapshot",
38 "CopyOptionGroup",
39 "CreateBlueGreenDeployment",
40 "CreateCustomDBEngineVersion",
41 "CreateDBCluster",
42 "CreateDBClusterEndpoint",
43 "CreateDBClusterParameterGroup",
44 "CreateDBClusterSnapshot",
45 "CreateDBInstance",
46 "CreateDBInstanceReadReplica",
47 "CreateDBParameterGroup",
48 "CreateDBProxy",
49 "CreateDBProxyEndpoint",
50 "CreateDBSecurityGroup",
51 "CreateDBShardGroup",
52 "CreateDBSnapshot",
53 "CreateDBSubnetGroup",
54 "CreateEventSubscription",
55 "CreateGlobalCluster",
56 "CreateIntegration",
57 "CreateOptionGroup",
58 "CreateTenantDatabase",
59 "DeleteBlueGreenDeployment",
60 "DeleteCustomDBEngineVersion",
61 "DeleteDBCluster",
62 "DeleteDBClusterAutomatedBackup",
63 "DeleteDBClusterEndpoint",
64 "DeleteDBClusterParameterGroup",
65 "DeleteDBClusterSnapshot",
66 "DeleteDBInstance",
67 "DeleteDBInstanceAutomatedBackup",
68 "DeleteDBParameterGroup",
69 "DeleteDBProxy",
70 "DeleteDBProxyEndpoint",
71 "DeleteDBSecurityGroup",
72 "DeleteDBShardGroup",
73 "DeleteDBSnapshot",
74 "DeleteDBSubnetGroup",
75 "DeleteEventSubscription",
76 "DeleteGlobalCluster",
77 "DeleteIntegration",
78 "DeleteOptionGroup",
79 "DeleteTenantDatabase",
80 "DeregisterDBProxyTargets",
81 "DescribeAccountAttributes",
82 "DescribeBlueGreenDeployments",
83 "DescribeCertificates",
84 "DescribeDBClusterAutomatedBackups",
85 "DescribeDBClusterBacktracks",
86 "DescribeDBClusterEndpoints",
87 "DescribeDBClusterParameterGroups",
88 "DescribeDBClusterParameters",
89 "DescribeDBClusterSnapshotAttributes",
90 "DescribeDBClusterSnapshots",
91 "DescribeDBClusters",
92 "DescribeDBEngineVersions",
93 "DescribeDBInstanceAutomatedBackups",
94 "DescribeDBInstances",
95 "DescribeDBLogFiles",
96 "DescribeDBMajorEngineVersions",
97 "DescribeDBParameterGroups",
98 "DescribeDBParameters",
99 "DescribeDBProxies",
100 "DescribeDBProxyEndpoints",
101 "DescribeDBProxyTargetGroups",
102 "DescribeDBProxyTargets",
103 "DescribeDBRecommendations",
104 "DescribeDBSecurityGroups",
105 "DescribeDBShardGroups",
106 "DescribeDBSnapshotAttributes",
107 "DescribeDBSnapshotTenantDatabases",
108 "DescribeDBSnapshots",
109 "DescribeDBSubnetGroups",
110 "DescribeEngineDefaultClusterParameters",
111 "DescribeEngineDefaultParameters",
112 "DescribeEventCategories",
113 "DescribeEventSubscriptions",
114 "DescribeEvents",
115 "DescribeExportTasks",
116 "DescribeGlobalClusters",
117 "DescribeIntegrations",
118 "DescribeOptionGroupOptions",
119 "DescribeOptionGroups",
120 "DescribeOrderableDBInstanceOptions",
121 "DescribePendingMaintenanceActions",
122 "DescribeReservedDBInstances",
123 "DescribeReservedDBInstancesOfferings",
124 "DescribeServerlessV2PlatformVersions",
125 "DescribeSourceRegions",
126 "DescribeTenantDatabases",
127 "DescribeValidDBInstanceModifications",
128 "DisableHttpEndpoint",
129 "DownloadDBLogFilePortion",
130 "EnableHttpEndpoint",
131 "FailoverDBCluster",
132 "FailoverGlobalCluster",
133 "ListTagsForResource",
134 "ModifyActivityStream",
135 "ModifyCertificates",
136 "ModifyCurrentDBClusterCapacity",
137 "ModifyCustomDBEngineVersion",
138 "ModifyDBCluster",
139 "ModifyDBClusterEndpoint",
140 "ModifyDBClusterParameterGroup",
141 "ModifyDBClusterSnapshotAttribute",
142 "ModifyDBInstance",
143 "ModifyDBParameterGroup",
144 "ModifyDBProxy",
145 "ModifyDBProxyEndpoint",
146 "ModifyDBProxyTargetGroup",
147 "ModifyDBRecommendation",
148 "ModifyDBShardGroup",
149 "ModifyDBSnapshot",
150 "ModifyDBSnapshotAttribute",
151 "ModifyDBSubnetGroup",
152 "ModifyEventSubscription",
153 "ModifyGlobalCluster",
154 "ModifyIntegration",
155 "ModifyOptionGroup",
156 "ModifyTenantDatabase",
157 "PromoteReadReplica",
158 "PromoteReadReplicaDBCluster",
159 "PurchaseReservedDBInstancesOffering",
160 "RebootDBCluster",
161 "RebootDBInstance",
162 "RebootDBShardGroup",
163 "RegisterDBProxyTargets",
164 "RemoveFromGlobalCluster",
165 "RemoveRoleFromDBCluster",
166 "RemoveRoleFromDBInstance",
167 "RemoveSourceIdentifierFromSubscription",
168 "RemoveTagsFromResource",
169 "ResetDBClusterParameterGroup",
170 "ResetDBParameterGroup",
171 "RestoreDBClusterFromS3",
172 "RestoreDBClusterFromSnapshot",
173 "RestoreDBClusterToPointInTime",
174 "RestoreDBInstanceFromDBSnapshot",
175 "RestoreDBInstanceFromS3",
176 "RestoreDBInstanceToPointInTime",
177 "RevokeDBSecurityGroupIngress",
178 "StartActivityStream",
179 "StartDBCluster",
180 "StartDBInstance",
181 "StartDBInstanceAutomatedBackupsReplication",
182 "StartExportTask",
183 "StopActivityStream",
184 "StopDBCluster",
185 "StopDBInstance",
186 "StopDBInstanceAutomatedBackupsReplication",
187 "SwitchoverBlueGreenDeployment",
188 "SwitchoverGlobalCluster",
189 "SwitchoverReadReplica",
190];
191
192pub struct RdsService {
193 pub(crate) state: SharedRdsState,
194 runtime: Option<Arc<RdsRuntime>>,
195 snapshot_store: Option<Arc<dyn SnapshotStore>>,
196 snapshot_lock: Arc<AsyncMutex<()>>,
197 pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
198}
199
200#[derive(Clone, Copy)]
202#[allow(dead_code, clippy::enum_variant_names)]
203pub(crate) enum RdsSourceType {
204 DbInstance,
205 DbSnapshot,
206 DbParameterGroup,
207 DbCluster,
208 DbClusterSnapshot,
209}
210
211impl RdsSourceType {
212 fn as_str(self) -> &'static str {
215 match self {
216 Self::DbInstance => "DB_INSTANCE",
217 Self::DbSnapshot => "DB_SNAPSHOT",
218 Self::DbParameterGroup => "DB_PARAMETER_GROUP",
219 Self::DbCluster => "DB_CLUSTER",
220 Self::DbClusterSnapshot => "DB_CLUSTER_SNAPSHOT",
221 }
222 }
223
224 pub(crate) fn describe_events_str(self) -> &'static str {
229 match self {
230 Self::DbInstance => "db-instance",
231 Self::DbSnapshot => "db-snapshot",
232 Self::DbParameterGroup => "db-parameter-group",
233 Self::DbCluster => "db-cluster",
234 Self::DbClusterSnapshot => "db-cluster-snapshot",
235 }
236 }
237
238 fn detail_type(self) -> &'static str {
239 match self {
240 Self::DbInstance => "RDS DB Instance Event",
241 Self::DbSnapshot => "RDS DB Snapshot Event",
242 Self::DbParameterGroup => "RDS DB Parameter Group Event",
243 Self::DbCluster => "RDS DB Cluster Event",
244 Self::DbClusterSnapshot => "RDS DB Cluster Snapshot Event",
245 }
246 }
247}
248
249mod cluster_snapshots;
250mod engine;
251mod instances;
252mod log_files;
253mod parameter_groups;
254mod replicas;
255mod restore;
256mod snapshots;
257mod subnet_groups;
258mod tags;
259
260impl RdsService {
261 pub(crate) fn state_handle(&self) -> &SharedRdsState {
262 &self.state
263 }
264}
265
266impl RdsService {
267 pub fn new(state: SharedRdsState) -> Self {
268 Self {
269 state,
270 runtime: None,
271 snapshot_store: None,
272 snapshot_lock: Arc::new(AsyncMutex::new(())),
273 delivery_bus: None,
274 }
275 }
276
277 pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
278 self.runtime = Some(runtime);
279 self
280 }
281
282 pub(crate) fn runtime_ref(&self) -> Option<&Arc<RdsRuntime>> {
286 self.runtime.as_ref()
287 }
288
289 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
290 self.snapshot_store = Some(store);
291 self
292 }
293
294 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
295 self.delivery_bus = Some(bus);
296 self
297 }
298
299 pub(crate) fn emit_event(
304 &self,
305 source_type: RdsSourceType,
306 source_identifier: &str,
307 source_arn: &str,
308 event_id: &str,
309 event_categories: &[&str],
310 message: &str,
311 ) {
312 let account_id = source_arn.split(':').nth(4).unwrap_or("");
315 emit_event_static_with_state(
316 self.delivery_bus.as_ref(),
317 Some(&self.state),
318 if account_id.is_empty() {
319 None
320 } else {
321 Some(account_id)
322 },
323 source_type,
324 source_identifier,
325 source_arn,
326 event_id,
327 event_categories,
328 message,
329 );
330 }
331
332 async fn save_snapshot(&self) {
333 save_snapshot_static(
334 self.state.clone(),
335 self.snapshot_store.clone(),
336 self.snapshot_lock.clone(),
337 )
338 .await;
339 }
340
341 pub async fn recover_persisted_containers(&self) {
350 let Some(runtime) = self.runtime.clone() else {
351 return;
352 };
353
354 struct Pending {
355 account_id: String,
356 region: String,
357 id: String,
358 arn: String,
359 engine: String,
360 engine_version: String,
361 username: String,
362 password: String,
363 db_name: String,
364 }
365
366 let pending: Vec<Pending> = {
367 let mut accounts = self.state.write();
368 let mut out = Vec::new();
369 for (_, state) in accounts.iter_mut() {
370 let account_id = state.account_id.clone();
371 let region = state.region.clone();
372 for (id, inst) in state.instances.iter_mut() {
373 if !matches!(
381 inst.db_instance_status.as_str(),
382 "creating"
383 | "available"
384 | "starting"
385 | "modifying"
386 | "rebooting"
387 | "backing-up"
388 ) {
389 continue;
390 }
391 inst.db_instance_status = "starting".to_string();
392 out.push(Pending {
393 account_id: account_id.clone(),
394 region: region.clone(),
395 id: id.clone(),
396 arn: inst.db_instance_arn.clone(),
397 engine: inst.engine.clone(),
398 engine_version: inst.engine_version.clone(),
399 username: inst.master_username.clone(),
400 password: inst.master_user_password.clone(),
401 db_name: inst
402 .db_name
403 .clone()
404 .unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
405 });
406 }
407 }
408 out
409 };
410
411 if pending.is_empty() {
412 return;
413 }
414 tracing::info!(
415 count = pending.len(),
416 "recovering backing containers for persisted rds instances",
417 );
418
419 for p in pending {
420 let runtime = runtime.clone();
421 let state = self.state.clone();
422 let snapshot_store = self.snapshot_store.clone();
423 let snapshot_lock = self.snapshot_lock.clone();
424 let delivery_bus = self.delivery_bus.clone();
425 tokio::spawn(async move {
426 match runtime
427 .ensure_postgres(
428 &p.id,
429 &p.engine,
430 &p.engine_version,
431 &p.username,
432 &p.password,
433 &p.db_name,
434 &p.account_id,
435 &p.region,
436 )
437 .await
438 {
439 Ok(running) => {
440 {
441 let mut accounts = state.write();
442 if let Some(s) = accounts.get_mut(&p.account_id) {
443 if let Some(inst) = s.instances.get_mut(&p.id) {
444 inst.db_instance_status = "available".to_string();
445 inst.endpoint_address = running.endpoint_address.clone();
446 inst.port = i32::from(running.endpoint_port);
447 inst.host_port = running.host_port;
448 inst.container_id = running.container_id;
449 }
450 }
451 }
452 save_snapshot_static(
453 state.clone(),
454 snapshot_store.clone(),
455 snapshot_lock.clone(),
456 )
457 .await;
458 emit_event_static(
459 delivery_bus.as_ref(),
460 RdsSourceType::DbInstance,
461 &p.id,
462 &p.arn,
463 "RDS-EVENT-0088",
464 &["notification"],
465 "DB instance restarted after fakecloud restart",
466 );
467 }
468 Err(error) => {
469 tracing::error!(
470 %error,
471 db_instance_identifier = %p.id,
472 "failed to recover rds backing container after restart",
473 );
474 {
475 let mut accounts = state.write();
476 if let Some(s) = accounts.get_mut(&p.account_id) {
477 if let Some(inst) = s.instances.get_mut(&p.id) {
478 inst.db_instance_status = "failed".to_string();
479 }
480 }
481 }
482 save_snapshot_static(state, snapshot_store, snapshot_lock).await;
483 }
484 }
485 });
486 }
487 }
488
489 async fn stop_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
496
497 let arn = {
502 let accounts = self.state.read();
503 let empty = RdsState::new(&request.account_id, &request.region);
504 let state = accounts.get(&request.account_id).unwrap_or(&empty);
505 state
506 .instances
507 .get(&db_instance_identifier)
508 .map(|i| i.db_instance_arn.clone())
509 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
510 };
511
512 if let Some(runtime) = self.runtime.as_ref() {
513 runtime.stop_container(&db_instance_identifier).await;
514 }
515
516 let instance = {
517 let mut accounts = self.state.write();
518 let state = accounts.get_or_create(&request.account_id);
519 let inst = state
520 .instances
521 .get_mut(&db_instance_identifier)
522 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
523 inst.db_instance_status = "stopped".to_string();
524 inst.container_id = String::new();
525 inst.clone()
526 };
527
528 self.emit_event(
529 RdsSourceType::DbInstance,
530 &db_instance_identifier,
531 &arn,
532 "RDS-EVENT-0089",
533 &["notification"],
534 "DB instance stopped",
535 );
536
537 Ok(AwsResponse::xml(
538 StatusCode::OK,
539 query_response_xml(
540 "StopDBInstance",
541 RDS_NS,
542 &format!(
543 "<DBInstance>{}</DBInstance>",
544 db_instance_xml(&instance, Some("stopped"))
545 ),
546 &request.request_id,
547 ),
548 ))
549 }
550
551 async fn start_db_instance(
554 &self,
555 request: &AwsRequest,
556 ) -> Result<AwsResponse, AwsServiceError> {
557 let db_instance_identifier = required_query_param(request, "DBInstanceIdentifier")?;
558
559 let instance = {
563 let accounts = self.state.read();
564 let empty = RdsState::new(&request.account_id, &request.region);
565 let state = accounts.get(&request.account_id).unwrap_or(&empty);
566 state
567 .instances
568 .get(&db_instance_identifier)
569 .cloned()
570 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
571 };
572
573 {
576 let mut accounts = self.state.write();
577 let state = accounts.get_or_create(&request.account_id);
578 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
579 inst.db_instance_status = "starting".to_string();
580 }
581 }
582
583 let running = if let Some(runtime) = self.runtime.as_ref() {
584 match runtime
585 .ensure_postgres(
586 &db_instance_identifier,
587 &instance.engine,
588 &instance.engine_version,
589 &instance.master_username,
590 &instance.master_user_password,
591 instance
592 .db_name
593 .as_deref()
594 .unwrap_or(default_db_name(&instance.engine)),
595 &request.account_id,
596 &request.region,
597 )
598 .await
599 {
600 Ok(r) => Some(r),
601 Err(e) => {
602 let mut accounts = self.state.write();
606 let state = accounts.get_or_create(&request.account_id);
607 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
608 inst.db_instance_status = "stopped".to_string();
609 }
610 return Err(runtime_error_to_service_error(e));
611 }
612 }
613 } else {
614 {
619 let mut accounts = self.state.write();
620 let state = accounts.get_or_create(&request.account_id);
621 if let Some(inst) = state.instances.get_mut(&db_instance_identifier) {
622 inst.db_instance_status = "stopped".to_string();
623 }
624 }
625 return Err(AwsServiceError::aws_error(
626 StatusCode::SERVICE_UNAVAILABLE,
627 "InternalFailure",
628 "Container runtime is not configured; cannot start DB instance",
629 ));
630 };
631
632 let instance = {
633 let mut accounts = self.state.write();
634 let state = accounts.get_or_create(&request.account_id);
635 let inst = state
636 .instances
637 .get_mut(&db_instance_identifier)
638 .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
639 inst.db_instance_status = "available".to_string();
640 inst.endpoint_address = "127.0.0.1".to_string();
641 if let Some(r) = running {
642 inst.endpoint_address = r.endpoint_address.clone();
643 inst.port = i32::from(r.endpoint_port);
644 inst.host_port = r.host_port;
645 inst.container_id = r.container_id;
646 }
647 inst.clone()
648 };
649
650 self.emit_event(
651 RdsSourceType::DbInstance,
652 &db_instance_identifier,
653 &instance.db_instance_arn,
654 "RDS-EVENT-0088",
655 &["notification"],
656 "DB instance started",
657 );
658
659 Ok(AwsResponse::xml(
660 StatusCode::OK,
661 query_response_xml(
662 "StartDBInstance",
663 RDS_NS,
664 &format!(
665 "<DBInstance>{}</DBInstance>",
666 db_instance_xml(&instance, None)
667 ),
668 &request.request_id,
669 ),
670 ))
671 }
672}
673
674fn is_declared_add_tags_not_found(code: &str) -> bool {
682 matches!(
683 code,
684 "BlueGreenDeploymentNotFoundFault"
685 | "DBClusterNotFoundFault"
686 | "DBInstanceNotFound"
687 | "DBProxyEndpointNotFoundFault"
688 | "DBProxyNotFoundFault"
689 | "DBProxyTargetGroupNotFoundFault"
690 | "DBShardGroupNotFound"
691 | "DBSnapshotNotFound"
692 | "DBSnapshotTenantDatabaseNotFoundFault"
693 | "IntegrationNotFoundFault"
694 | "InvalidDBClusterEndpointStateFault"
695 | "InvalidDBClusterStateFault"
696 | "InvalidDBInstanceState"
697 | "TenantDatabaseNotFound"
698 )
699}
700
701async fn save_snapshot_static(
703 state: SharedRdsState,
704 store: Option<Arc<dyn SnapshotStore>>,
705 lock: Arc<AsyncMutex<()>>,
706) {
707 let Some(store) = store else {
708 return;
709 };
710 let _guard = lock.lock().await;
711 let snapshot = RdsSnapshot {
712 schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
713 state: None,
714 accounts: Some(state.read().clone()),
715 };
716 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
717 let bytes = serde_json::to_vec(&snapshot)
718 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
719 store.save(&bytes)
720 })
721 .await;
722 match join {
723 Ok(Ok(())) => {}
724 Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
725 Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
726 }
727}
728
729impl RdsService {
730 fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
741 self.runtime.as_ref().ok_or_else(|| {
742 AwsServiceError::aws_error(
743 StatusCode::SERVICE_UNAVAILABLE,
744 "InsufficientDBInstanceCapacity",
745 "Docker/Podman is required for RDS DB instances but is not available",
746 )
747 })
748 }
749}
750
751#[async_trait]
752impl AwsService for RdsService {
753 fn service_name(&self) -> &str {
754 "rds"
755 }
756
757 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
758 crate::validation::prevalidate(request.action.as_str(), &request)?;
763
764 let mutates = is_mutating_action(request.action.as_str());
765 let result = match request.action.as_str() {
766 "AddTagsToResource" => self.add_tags_to_resource(&request),
767 "CreateDBInstance" => self.create_db_instance(&request).await,
768 "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
769 "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
770 "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
771 "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
772 "DeleteDBInstance" => self.delete_db_instance(&request).await,
773 "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
774 "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
775 "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
776 "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
777 "DescribeDBInstances" => self.describe_db_instances(&request),
778 "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
779 "DescribeDBParameters" => self.describe_db_parameters_real(&request),
780 "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
781 "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
782 "DescribeOrderableDBInstanceOptions" => {
783 self.describe_orderable_db_instance_options(&request)
784 }
785 "ListTagsForResource" => self.list_tags_for_resource(&request),
786 "ModifyDBInstance" => self.modify_db_instance(&request),
787 "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
788 "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
789 "RebootDBInstance" => self.reboot_db_instance(&request).await,
790 "StartDBInstance" => self.start_db_instance(&request).await,
791 "StopDBInstance" => self.stop_db_instance(&request).await,
792 "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
793 "RestoreDBInstanceFromDBSnapshot" => {
794 self.restore_db_instance_from_db_snapshot(&request).await
795 }
796 "RestoreDBInstanceToPointInTime" => {
797 self.restore_db_instance_to_point_in_time(&request).await
798 }
799 "RestoreDBInstanceFromS3" => self.restore_db_instance_from_s3(&request).await,
800 "DescribeDBLogFiles" => self.describe_db_log_files(&request).await,
801 "DownloadDBLogFilePortion" => self.download_db_log_file_portion(&request).await,
802 "CreateDBClusterSnapshot" => self.create_db_cluster_snapshot(&request).await,
803 "RestoreDBClusterFromSnapshot" => self.restore_db_cluster_from_snapshot(&request).await,
804 "RestoreDBClusterToPointInTime" => {
805 self.restore_db_cluster_to_point_in_time(&request).await
806 }
807 _ => self.handle_extra_action(&request),
808 };
809 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
810 self.save_snapshot().await;
811 }
812 result
813 }
814
815 fn supported_actions(&self) -> &[&str] {
816 SUPPORTED_ACTIONS
817 }
818}
819
820impl RdsService {}
821
822pub(crate) fn render_user_parameter_xml(name: &str, value: &str) -> String {
826 format!(
827 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>user</Source>\n <ApplyType>dynamic</ApplyType>\n <DataType>string</DataType>\n <IsModifiable>true</IsModifiable>\n </Parameter>\n",
828 xml_escape(name),
829 xml_escape(value),
830 )
831}
832
833pub(crate) fn render_engine_default_parameter_xml(
837 default: &crate::state::EngineDefaultParameter,
838) -> String {
839 format!(
840 " <Parameter>\n <ParameterName>{}</ParameterName>\n <ParameterValue>{}</ParameterValue>\n <Source>engine-default</Source>\n <ApplyType>{}</ApplyType>\n <DataType>{}</DataType>\n <AllowedValues>{}</AllowedValues>\n <IsModifiable>{}</IsModifiable>\n </Parameter>\n",
841 xml_escape(default.name),
842 xml_escape(default.value),
843 xml_escape(default.apply_type),
844 xml_escape(default.data_type),
845 xml_escape(default.allowed_values),
846 default.is_modifiable,
847 )
848}
849
850pub(crate) fn parse_db_parameter_members(request: &AwsRequest) -> Vec<(String, String)> {
858 let mut out = Vec::new();
859 for prefix in ["Parameters.Parameter", "Parameters.member"] {
860 let mut index = 1;
861 loop {
862 let name_key = format!("{prefix}.{index}.ParameterName");
863 let value_key = format!("{prefix}.{index}.ParameterValue");
864 let name = optional_query_param(request, &name_key);
865 let value = optional_query_param(request, &value_key);
866 if name.is_none() && value.is_none() {
867 break;
868 }
869 if let (Some(n), Some(v)) = (name, value) {
870 if !n.is_empty() {
871 out.push((n, v));
872 }
873 }
874 index += 1;
875 }
876 }
877 out
878}
879
880fn map_log_file_to_container_path(engine: &str, log_file_name: &str) -> String {
884 match (engine, log_file_name) {
885 (_, "error/postgres.log") => "/var/log/postgresql/postgresql.log".to_string(),
886 (_, "trace/postgres-trace.log") => "/var/log/postgresql/postgresql.log".to_string(),
887 ("mysql" | "mariadb", "error/mysql-error.log") => "/var/log/mysql/error.log".to_string(),
888 ("mysql" | "mariadb", "slowquery/mysql-slowquery.log") => {
889 "/var/log/mysql/slow.log".to_string()
890 }
891 _ => log_file_name.to_string(),
892 }
893}
894
895pub(crate) struct PaginationResult<T> {
896 items: Vec<T>,
897 next_marker: Option<String>,
898}
899
900fn attach_cluster_member(state: &mut RdsState, cluster_id: &str, instance_id: &str) {
904 use serde_json::{json, Value};
905 let Some(map) = state.extras.get_mut("clusters") else {
906 return;
907 };
908 let Some(entry) = map.get_mut(cluster_id) else {
909 return;
910 };
911 let Some(obj) = entry.as_object_mut() else {
912 return;
913 };
914 let mut members: Vec<Value> = obj
915 .get("DBClusterMembers")
916 .and_then(|v| v.as_array())
917 .cloned()
918 .unwrap_or_default();
919 if members
920 .iter()
921 .any(|m| m["DBInstanceIdentifier"].as_str() == Some(instance_id))
922 {
923 return;
924 }
925 let has_writer = members
926 .iter()
927 .any(|m| m["IsClusterWriter"].as_bool() == Some(true));
928 let promotion_tier = (members.len() as i64) + 1;
929 members.push(json!({
930 "DBInstanceIdentifier": instance_id,
931 "IsClusterWriter": !has_writer,
932 "DBClusterParameterGroupStatus": "in-sync",
933 "PromotionTier": promotion_tier,
934 }));
935 obj.insert("DBClusterMembers".to_string(), Value::Array(members));
936 if !has_writer {
937 obj.insert(
938 "WriterDBInstanceIdentifier".to_string(),
939 Value::String(instance_id.to_string()),
940 );
941 }
942}
943
944#[path = "../service_helpers.rs"]
945mod service_helpers;
946pub(crate) use service_helpers::*;
947
948#[cfg(test)]
949#[path = "../service_tests.rs"]
950mod tests;