use super::*;
impl RdsService {
pub(super) async fn create_db_cluster_snapshot(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use serde_json::json;
let snapshot_id = required_query_param(request, "DBClusterSnapshotIdentifier")?;
let cluster_id = required_query_param(request, "DBClusterIdentifier")?;
let arn = format!(
"arn:aws:rds:{}:{}:cluster-snapshot:{}",
request.region, request.account_id, snapshot_id
);
let writer_info = {
let accounts = self.state.read();
accounts.get(&request.account_id).and_then(|state| {
let cluster_entry = state.extras.get("clusters")?.get(&cluster_id)?;
let writer_id = cluster_entry
.get("WriterDBInstanceIdentifier")
.and_then(|v| v.as_str())
.map(str::to_string)
.or_else(|| {
cluster_entry
.get("DBClusterMembers")
.and_then(|m| m.as_array())
.and_then(|arr| {
arr.iter()
.find(|m| m["IsClusterWriter"].as_bool() == Some(true))
.or_else(|| arr.first())
.and_then(|m| m["DBInstanceIdentifier"].as_str())
.map(str::to_string)
})
})?;
let inst = state.instances.get(&writer_id)?;
Some((
inst.db_instance_identifier.clone(),
inst.engine.clone(),
inst.master_username.clone(),
inst.master_user_password.clone(),
inst.db_name
.clone()
.unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
))
})
};
let dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
if let Some(runtime) = self.runtime_ref() {
match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
Ok(data) => {
use base64::Engine;
Some(base64::engine::general_purpose::STANDARD.encode(&data))
}
Err(e) => {
tracing::warn!(
error = %e,
cluster = %cluster_id,
writer = %wid,
"cluster snapshot dump failed; falling back to metadata-only snapshot"
);
None
}
}
} else {
None
}
} else {
None
};
{
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let mut entry = state
.extras
.get("clusters")
.and_then(|m| m.get(&cluster_id))
.cloned()
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBClusterNotFoundFault",
format!("DBCluster {cluster_id} not found."),
)
})?;
if let Some(obj) = entry.as_object_mut() {
obj.insert(
"DBClusterSnapshotIdentifier".to_string(),
json!(snapshot_id),
);
obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
obj.insert("DBClusterIdentifier".to_string(), json!(cluster_id));
obj.insert("Status".to_string(), json!("available"));
obj.insert("SnapshotType".to_string(), json!("manual"));
if let Some(b64) = dump_b64.as_ref() {
obj.insert("DumpDataB64".to_string(), json!(b64));
}
}
state
.extras
.entry("cluster_snapshots".to_string())
.or_default()
.insert(snapshot_id.clone(), entry);
}
self.emit_event(
RdsSourceType::DbClusterSnapshot,
&snapshot_id,
&arn,
"RDS-EVENT-0074",
&["backup"],
"DB cluster snapshot created",
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"CreateDBClusterSnapshot",
RDS_NS,
&crate::extras::cluster_snapshot_xml(&snapshot_id, &arn, &cluster_id),
&request.request_id,
),
))
}
pub(super) async fn restore_db_cluster_from_snapshot(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use serde_json::json;
let target = required_query_param(request, "DBClusterIdentifier")?;
let snapshot_id = optional_query_param(request, "SnapshotIdentifier")
.or_else(|| optional_query_param(request, "DBClusterSnapshotIdentifier"))
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBClusterSnapshotNotFoundFault",
"SnapshotIdentifier is required",
)
})?;
let arn = format!(
"arn:aws:rds:{}:{}:cluster:{}",
request.region, request.account_id, target
);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let snapshot = state
.extras
.get("cluster_snapshots")
.and_then(|m| m.get(&snapshot_id))
.cloned()
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBClusterSnapshotNotFoundFault",
format!("DBClusterSnapshot {snapshot_id} not found."),
)
})?;
let pending_dump_b64 = snapshot
.get("DumpDataB64")
.and_then(|v| v.as_str())
.map(str::to_string);
let mut entry = snapshot.clone();
if let Some(obj) = entry.as_object_mut() {
obj.insert("DBClusterIdentifier".to_string(), json!(target));
obj.insert("DBClusterArn".to_string(), json!(arn));
obj.insert("Status".to_string(), json!("available"));
obj.insert(
"Endpoint".to_string(),
json!(format!(
"{target}.cluster-xxx.{}.rds.amazonaws.com",
request.region
)),
);
obj.insert(
"ReaderEndpoint".to_string(),
json!(format!(
"{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
request.region
)),
);
obj.remove("ReplicationSourceIdentifier");
obj.remove("DBClusterMembers");
obj.remove("WriterDBInstanceIdentifier");
obj.remove("DBClusterSnapshotIdentifier");
obj.remove("DBClusterSnapshotArn");
obj.remove("DumpDataB64");
if let Some(engine) = optional_query_param(request, "Engine") {
obj.insert("Engine".to_string(), json!(engine));
}
if let Some(version) = optional_query_param(request, "EngineVersion") {
obj.insert("EngineVersion".to_string(), json!(version));
}
if let Some(port) =
optional_query_param(request, "Port").and_then(|p| p.parse::<i64>().ok())
{
obj.insert("Port".to_string(), json!(port));
}
if let Some(b64) = pending_dump_b64 {
obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
}
}
state
.extras
.entry("clusters".to_string())
.or_default()
.insert(target.clone(), entry);
drop(accounts);
self.emit_event(
RdsSourceType::DbCluster,
&target,
&arn,
"RDS-EVENT-0170",
&["creation"],
"DB cluster restored from snapshot",
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"RestoreDBClusterFromSnapshot",
RDS_NS,
&crate::extras::db_cluster_xml(&target, &arn),
&request.request_id,
),
))
}
pub(super) async fn restore_db_cluster_to_point_in_time(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use serde_json::json;
let target = required_query_param(request, "DBClusterIdentifier")?;
let source = optional_query_param(request, "SourceDBClusterIdentifier")
.or_else(|| optional_query_param(request, "SourceDbClusterResourceId"))
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBClusterNotFoundFault",
"Source DB cluster identifier not provided.",
)
})?;
let arn = format!(
"arn:aws:rds:{}:{}:cluster:{}",
request.region, request.account_id, target
);
let writer_info = {
let accounts = self.state.read();
accounts.get(&request.account_id).and_then(|state| {
let cluster_entry = state.extras.get("clusters")?.get(&source)?;
let writer_id = cluster_entry
.get("WriterDBInstanceIdentifier")
.and_then(|v| v.as_str())
.map(str::to_string)
.or_else(|| {
cluster_entry
.get("DBClusterMembers")
.and_then(|m| m.as_array())
.and_then(|arr| {
arr.iter()
.find(|m| m["IsClusterWriter"].as_bool() == Some(true))
.or_else(|| arr.first())
.and_then(|m| m["DBInstanceIdentifier"].as_str())
.map(str::to_string)
})
})?;
let inst = state.instances.get(&writer_id)?;
Some((
inst.db_instance_identifier.clone(),
inst.engine.clone(),
inst.master_username.clone(),
inst.master_user_password.clone(),
inst.db_name
.clone()
.unwrap_or_else(|| default_db_name(&inst.engine).to_string()),
))
})
};
let pending_dump_b64 = if let Some((wid, eng, user, pass, db)) = writer_info {
if let Some(runtime) = self.runtime_ref() {
match runtime.dump_database(&wid, &eng, &user, &pass, &db).await {
Ok(data) => {
use base64::Engine;
Some(base64::engine::general_purpose::STANDARD.encode(&data))
}
Err(e) => {
tracing::warn!(
error = %e,
cluster = %source,
writer = %wid,
"cluster PIT dump failed; falling back to metadata-only restore"
);
None
}
}
} else {
None
}
} else {
None
};
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let mut entry = state
.extras
.get("clusters")
.and_then(|m| m.get(&source))
.cloned()
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"DBClusterNotFoundFault",
format!("DBCluster {source} not found."),
)
})?;
if let Some(obj) = entry.as_object_mut() {
obj.insert("DBClusterIdentifier".to_string(), json!(target));
obj.insert("DBClusterArn".to_string(), json!(arn));
obj.insert("Status".to_string(), json!("available"));
obj.insert(
"Endpoint".to_string(),
json!(format!(
"{target}.cluster-xxx.{}.rds.amazonaws.com",
request.region
)),
);
obj.insert(
"ReaderEndpoint".to_string(),
json!(format!(
"{target}.cluster-ro-xxx.{}.rds.amazonaws.com",
request.region
)),
);
obj.remove("DBClusterMembers");
obj.remove("WriterDBInstanceIdentifier");
if let Some(restore_time) = optional_query_param(request, "RestoreToTime") {
obj.insert("RestoreToTime".to_string(), json!(restore_time));
}
if let Some(latest) = optional_query_param(request, "UseLatestRestorableTime") {
obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
}
if let Some(b64) = pending_dump_b64 {
obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
}
}
state
.extras
.entry("clusters".to_string())
.or_default()
.insert(target.clone(), entry);
drop(accounts);
self.emit_event(
RdsSourceType::DbCluster,
&target,
&arn,
"RDS-EVENT-0171",
&["creation"],
"DB cluster restored to point in time",
);
Ok(AwsResponse::xml(
StatusCode::OK,
query_response_xml(
"RestoreDBClusterToPointInTime",
RDS_NS,
&crate::extras::db_cluster_xml(&target, &arn),
&request.request_id,
),
))
}
}