Skip to main content

fakecloud_rds/extras/
mod.rs

1//! RDS handlers added to close the conformance gap. Clusters, cluster
2//! snapshots / parameter groups / endpoints, security groups, option
3//! groups, event subscriptions, global clusters, integrations, blue/green
4//! deployments, shard groups, custom engine versions, tenant databases,
5//! proxies, export tasks, recommendations, certificates, accounts /
6//! events / pending maintenance, and start/stop/reboot/failover ops.
7//!
8//! Persists into per-account state via the generic
9//! `extras: HashMap<category, HashMap<id, Value>>` store on
10//! `RdsState`. Returns valid Query-protocol XML responses with
11//! stable IDs so SDK callers can chain operations.
12
13use http::StatusCode;
14use serde_json::{json, Value};
15use std::collections::BTreeMap;
16
17use fakecloud_aws::arn::Arn;
18use fakecloud_aws::xml::xml_escape;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
20
21use crate::service::{RdsService, RdsSourceType};
22
23const NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
24
25fn rand_id() -> String {
26    format!(
27        "{:x}",
28        std::time::SystemTime::now()
29            .duration_since(std::time::UNIX_EPOCH)
30            .map(|d| d.as_nanos())
31            .unwrap_or(0)
32    )
33}
34
35pub(crate) fn xml_response(action: &str, inner: String, request_id: &str) -> AwsResponse {
36    let body = format!(
37        r#"<{action}Response xmlns="{NS}">
38  <{action}Result>
39{inner}
40  </{action}Result>
41  <ResponseMetadata>
42    <RequestId>{rid}</RequestId>
43  </ResponseMetadata>
44</{action}Response>"#,
45        action = action,
46        NS = NS,
47        inner = inner,
48        rid = xml_escape(request_id),
49    );
50    AwsResponse::xml(StatusCode::OK, body)
51}
52
53fn xml_response_no_result(action: &str, request_id: &str) -> AwsResponse {
54    let body = format!(
55        r#"<{action}Response xmlns="{NS}">
56  <ResponseMetadata>
57    <RequestId>{rid}</RequestId>
58  </ResponseMetadata>
59</{action}Response>"#,
60        action = action,
61        NS = NS,
62        rid = xml_escape(request_id),
63    );
64    AwsResponse::xml(StatusCode::OK, body)
65}
66
67fn members<F>(items: &[Value], render: F) -> String
68where
69    F: Fn(&Value) -> String,
70{
71    items
72        .iter()
73        .map(|v| format!("        <member>\n{}\n        </member>", render(v)))
74        .collect::<Vec<_>>()
75        .join("\n")
76}
77
78fn store<'a>(
79    extras: &'a mut BTreeMap<String, BTreeMap<String, Value>>,
80    category: &str,
81) -> &'a mut BTreeMap<String, Value> {
82    extras.entry(category.to_string()).or_default()
83}
84
85fn get_param(req: &AwsRequest, key: &str) -> Option<String> {
86    if let Some(v) = req.query_params.get(key) {
87        return Some(v.clone());
88    }
89    let body_params = fakecloud_core::protocol::parse_query_body(&req.body);
90    body_params.get(key).cloned()
91}
92
93fn missing(name: &str) -> AwsServiceError {
94    AwsServiceError::aws_error(
95        StatusCode::BAD_REQUEST,
96        "InvalidParameterValue",
97        format!("{name} is required"),
98    )
99}
100
101impl RdsService {
102    pub(crate) fn handle_extra_action(
103        &self,
104        req: &AwsRequest,
105    ) -> Result<AwsResponse, AwsServiceError> {
106        let action = req.action.clone();
107        let aid = req.account_id.clone();
108        let rid = req.request_id.clone();
109        let region = "us-east-1"; // RDS uses us-east-1 by default in fakecloud
110
111        macro_rules! write_state {
112            () => {{
113                let mut accounts = self.state_handle().write();
114                accounts.get_or_create(&aid);
115                accounts
116            }};
117        }
118
119        match action.as_str() {
120            // ── DB Clusters ──
121            "CreateDBCluster" => {
122                let id = get_param(req, "DBClusterIdentifier").ok_or_else(|| missing("DBClusterIdentifier"))?;
123                let arn = Arn::new("rds", region, &aid, &format!("cluster:{id}")).to_string();
124                let engine = get_param(req, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string());
125                let port = get_param(req, "Port")
126                    .and_then(|p| p.parse::<i64>().ok())
127                    .unwrap_or(if engine.contains("mysql") { 3306 } else { 5432 });
128                let entry = json!({
129                    "DBClusterIdentifier": id, "DBClusterArn": arn,
130                    "DbClusterResourceId": new_cluster_resource_id(),
131                    "Status": "available", "Engine": engine,
132                    "EngineVersion": get_param(req, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
133                    "Endpoint": format!("{id}.cluster-xxx.{region}.rds.amazonaws.com"),
134                    "ReaderEndpoint": format!("{id}.cluster-ro-xxx.{region}.rds.amazonaws.com"),
135                    "Port": port, "MasterUsername": get_param(req, "MasterUsername").unwrap_or_else(|| "postgres".to_string()),
136                });
137                {
138                    let mut accounts = write_state!();
139                    let state = accounts.get_or_create(&aid);
140                    store(&mut state.extras, "clusters").insert(id.clone(), entry.clone());
141                }
142                self.emit_event(
143                    RdsSourceType::DbCluster,
144                    &id,
145                    &arn,
146                    "RDS-EVENT-0170",
147                    &["creation"],
148                    "DB cluster created",
149                );
150                Ok(xml_response(
151                    "CreateDBCluster",
152                    format!(
153                        "    <DBCluster>\n{}\n    </DBCluster>",
154                        db_cluster_member_xml(&entry)
155                    ),
156                    &rid,
157                ))
158            }
159            "DeleteDBCluster" => {
160                let id = get_param(req, "DBClusterIdentifier").ok_or_else(|| missing("DBClusterIdentifier"))?;
161                let arn = Arn::new("rds", region, &aid, &format!("cluster:{id}")).to_string();
162                {
163                    let mut accounts = write_state!();
164                    let state = accounts.get_or_create(&aid);
165                    if let Some(m) = state.extras.get_mut("clusters") { m.remove(&id); }
166                }
167                self.emit_event(
168                    RdsSourceType::DbCluster,
169                    &id,
170                    &arn,
171                    "RDS-EVENT-0171",
172                    &["deletion"],
173                    "DB cluster deleted",
174                );
175                Ok(xml_response("DeleteDBCluster", db_cluster_xml(&id, &arn), &rid))
176            }
177            "ModifyDBCluster" => modify_db_cluster_action(self, &aid, region, req, &rid),
178            "StartDBCluster" => start_db_cluster_action(self, &aid, region, req, &rid),
179            "StopDBCluster" => stop_db_cluster_action(self, &aid, region, req, &rid),
180            "RebootDBCluster" => reboot_db_cluster_action(self, &aid, region, req, &rid),
181            "FailoverDBCluster" => failover_db_cluster_action(self, &aid, region, req, &rid),
182            "BacktrackDBCluster" => backtrack_db_cluster_action(self, &aid, region, req, &rid),
183            "PromoteReadReplicaDBCluster" => {
184                let id = get_param(req, "DBClusterIdentifier")
185                    .ok_or_else(|| missing("DBClusterIdentifier"))?;
186                let arn = Arn::new("rds", region, &aid, &format!("cluster:{id}")).to_string();
187                let mut accounts = write_state!();
188                let state = accounts.get_or_create(&aid);
189                if let Some(map) = state.extras.get_mut("clusters") {
190                    if let Some(entry) = map.get_mut(&id) {
191                        if let Some(obj) = entry.as_object_mut() {
192                            obj.remove("ReplicationSourceIdentifier");
193                        }
194                    }
195                }
196                Ok(xml_response(
197                    "PromoteReadReplicaDBCluster",
198                    db_cluster_xml(&id, &arn),
199                    &rid,
200                ))
201            }
202            "DescribeDBClusters" => {
203                let id_filter = get_param(req, "DBClusterIdentifier");
204                let accounts = self.state_handle().read();
205                let items: Vec<Value> = accounts.get(&aid)
206                    .and_then(|s| s.extras.get("clusters"))
207                    .map(|m| {
208                        m.values()
209                            .filter(|v| {
210                                id_filter
211                                    .as_deref()
212                                    .map(|filter| v["DBClusterIdentifier"].as_str() == Some(filter))
213                                    .unwrap_or(true)
214                            })
215                            .cloned()
216                            .collect()
217                    })
218                    .unwrap_or_default();
219                let body = items
220                    .iter()
221                    .map(|v| {
222                        format!(
223                            "      <DBCluster>\n{}\n      </DBCluster>",
224                            db_cluster_member_xml(v)
225                        )
226                    })
227                    .collect::<Vec<_>>()
228                    .join("\n");
229                let inner = format!("    <DBClusters>\n{body}\n    </DBClusters>");
230                Ok(xml_response("DescribeDBClusters", inner, &rid))
231            }
232
233            // ── DB Cluster snapshots ──
234            // CreateDBClusterSnapshot is implemented in service.rs because
235            // the real path needs the async runtime to dump the writer's
236            // database. We keep a metadata-only fallback here for unit
237            // tests that exercise the extras handler directly without a
238            // runtime wired up; the dispatcher in `RdsService::handle_request`
239            // routes the action through the async path before reaching us.
240            "CreateDBClusterSnapshot" => {
241                let id = get_param(req, "DBClusterSnapshotIdentifier")
242                    .ok_or_else(|| missing("DBClusterSnapshotIdentifier"))?;
243                let arn = Arn::new("rds", region, &aid, &format!("cluster-snapshot:{id}")).to_string();
244                let cluster = get_param(req, "DBClusterIdentifier").unwrap_or_else(|| "default".to_string());
245                {
246                    let mut accounts = write_state!();
247                    let state = accounts.get_or_create(&aid);
248                    let mut entry = state
249                        .extras
250                        .get("clusters")
251                        .and_then(|m| m.get(&cluster))
252                        .cloned()
253                        .unwrap_or_else(|| json!({}));
254                    if let Some(obj) = entry.as_object_mut() {
255                        obj.insert("DBClusterSnapshotIdentifier".to_string(), json!(id));
256                        obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
257                        obj.insert("DBClusterIdentifier".to_string(), json!(cluster));
258                        obj.insert("Status".to_string(), json!("available"));
259                        obj.insert("SnapshotType".to_string(), json!("manual"));
260                    }
261                    store(&mut state.extras, "cluster_snapshots").insert(id.clone(), entry);
262                }
263                self.emit_event(
264                    RdsSourceType::DbClusterSnapshot,
265                    &id,
266                    &arn,
267                    "RDS-EVENT-0074",
268                    &["backup"],
269                    "DB cluster snapshot created",
270                );
271                Ok(xml_response(action.as_str(), cluster_snapshot_xml(&id, &arn, &cluster), &rid))
272            }
273            "CopyDBClusterSnapshot" => {
274                let id = get_param(req, "TargetDBClusterSnapshotIdentifier")
275                    .ok_or_else(|| missing("TargetDBClusterSnapshotIdentifier"))?;
276                let source_id = get_param(req, "SourceDBClusterSnapshotIdentifier")
277                    .ok_or_else(|| missing("SourceDBClusterSnapshotIdentifier"))?;
278                let arn = Arn::new("rds", region, &aid, &format!("cluster-snapshot:{id}")).to_string();
279                let mut accounts = write_state!();
280                let state = accounts.get_or_create(&aid);
281                let source_key = source_id.rsplit(':').next().unwrap_or(&source_id).to_string();
282                let mut entry = state
283                    .extras
284                    .get("cluster_snapshots")
285                    .and_then(|m| m.get(&source_key))
286                    .cloned()
287                    .ok_or_else(|| {
288                        AwsServiceError::aws_error(
289                            StatusCode::NOT_FOUND,
290                            "DBClusterSnapshotNotFoundFault",
291                            format!("DBClusterSnapshot {source_id} not found."),
292                        )
293                    })?;
294                let cluster = entry
295                    .get("DBClusterIdentifier")
296                    .and_then(|v| v.as_str())
297                    .unwrap_or("default")
298                    .to_string();
299                if let Some(obj) = entry.as_object_mut() {
300                    obj.insert("DBClusterSnapshotIdentifier".to_string(), json!(id));
301                    obj.insert("DBClusterSnapshotArn".to_string(), json!(arn));
302                    obj.insert("Status".to_string(), json!("available"));
303                    obj.insert("SnapshotType".to_string(), json!("manual"));
304                    obj.insert("SourceDBClusterSnapshotArn".to_string(), json!(source_id));
305                }
306                store(&mut state.extras, "cluster_snapshots").insert(id.clone(), entry);
307                Ok(xml_response(action.as_str(), cluster_snapshot_xml(&id, &arn, &cluster), &rid))
308            }
309            "DeleteDBClusterSnapshot" => {
310                let id = get_param(req, "DBClusterSnapshotIdentifier").ok_or_else(|| missing("DBClusterSnapshotIdentifier"))?;
311                let arn = Arn::new("rds", region, &aid, &format!("cluster-snapshot:{id}")).to_string();
312                // Recover the source cluster id from stored state before
313                // remove — emitting a hardcoded "default" would corrupt
314                // downstream consumers that key off DBClusterIdentifier.
315                let cluster = {
316                    let mut accounts = write_state!();
317                    let state = accounts.get_or_create(&aid);
318                    let prior = state
319                        .extras
320                        .get("cluster_snapshots")
321                        .and_then(|m| m.get(&id))
322                        .and_then(|v| v.get("DBClusterIdentifier"))
323                        .and_then(|v| v.as_str())
324                        .unwrap_or_default()
325                        .to_string();
326                    if let Some(m) = state.extras.get_mut("cluster_snapshots") { m.remove(&id); }
327                    prior
328                };
329                self.emit_event(
330                    RdsSourceType::DbClusterSnapshot,
331                    &id,
332                    &arn,
333                    "RDS-EVENT-0075",
334                    &["deletion"],
335                    "DB cluster snapshot deleted",
336                );
337                Ok(xml_response("DeleteDBClusterSnapshot", cluster_snapshot_xml(&id, &arn, &cluster), &rid))
338            }
339            "DescribeDBClusterSnapshots" => list_extras_xml(self, &aid, "cluster_snapshots", "DBClusterSnapshots", "DescribeDBClusterSnapshots", cluster_snapshot_member_xml, &rid),
340            "DescribeDBClusterSnapshotAttributes" | "ModifyDBClusterSnapshotAttribute" => {
341                let id = get_param(req, "DBClusterSnapshotIdentifier").unwrap_or_default();
342                Ok(xml_response(action.as_str(), format!("    <DBClusterSnapshotAttributesResult>\n      <DBClusterSnapshotIdentifier>{}</DBClusterSnapshotIdentifier>\n      <DBClusterSnapshotAttributes/>\n    </DBClusterSnapshotAttributesResult>", xml_escape(&id)), &rid))
343            }
344            "DescribeDBClusterAutomatedBackups" => Ok(xml_response("DescribeDBClusterAutomatedBackups", "    <DBClusterAutomatedBackups/>".to_string(), &rid)),
345            "DeleteDBClusterAutomatedBackup" => Ok(xml_response("DeleteDBClusterAutomatedBackup", "    <DBClusterAutomatedBackup/>".to_string(), &rid)),
346            "DescribeDBClusterBacktracks" => Ok(xml_response("DescribeDBClusterBacktracks", "    <DBClusterBacktracks/>".to_string(), &rid)),
347
348            // ── DB Cluster parameter groups ──
349            "CreateDBClusterParameterGroup" | "CopyDBClusterParameterGroup" => {
350                let name = get_param(req, "DBClusterParameterGroupName").or_else(|| get_param(req, "TargetDBClusterParameterGroupIdentifier"))
351                    .ok_or_else(|| missing("DBClusterParameterGroupName"))?;
352                let arn = Arn::new("rds", region, &aid, &format!("cluster-pg:{name}")).to_string();
353                let family = get_param(req, "DBParameterGroupFamily").unwrap_or_else(|| "aurora-postgresql15".to_string());
354                let description = get_param(req, "Description").unwrap_or_default();
355                let entry = json!({"DBClusterParameterGroupName": name, "DBClusterParameterGroupArn": arn, "DBParameterGroupFamily": family, "Description": description});
356                let mut accounts = write_state!();
357                let state = accounts.get_or_create(&aid);
358                store(&mut state.extras, "cluster_param_groups").insert(name.clone(), entry);
359                Ok(xml_response(action.as_str(), cluster_pg_xml(&name, &arn, &family, &description), &rid))
360            }
361            "ModifyDBClusterParameterGroup" => {
362                let name = get_param(req, "DBClusterParameterGroupName").ok_or_else(|| missing("DBClusterParameterGroupName"))?;
363                let parsed = crate::service::parse_db_parameter_members(req);
364                let mut accounts = write_state!();
365                let state = accounts.get_or_create(&aid);
366                if let Some(map) = state.extras.get_mut("cluster_param_groups") {
367                    if let Some(entry) = map.get_mut(&name) {
368                        if let Some(obj) = entry.as_object_mut() {
369                            if !obj.contains_key("Parameters") {
370                                obj.insert("Parameters".to_string(), json!({}));
371                            }
372                            if !obj.contains_key("ParameterApplyMethods") {
373                                obj.insert("ParameterApplyMethods".to_string(), json!({}));
374                            }
375                            // Capture values and apply methods separately so the
376                            // existing string-valued `Parameters` map shape stays
377                            // backward compatible with older persisted snapshots.
378                            let apply_methods: Vec<(String, String)> = parsed
379                                .iter()
380                                .map(|p| (p.name.clone(), p.apply_method.clone()))
381                                .collect();
382                            if let Some(p) = obj.get_mut("Parameters").and_then(|p| p.as_object_mut()) {
383                                for param in &parsed {
384                                    p.insert(param.name.clone(), json!(param.value));
385                                }
386                            }
387                            if let Some(m) = obj
388                                .get_mut("ParameterApplyMethods")
389                                .and_then(|m| m.as_object_mut())
390                            {
391                                for (n, am) in apply_methods {
392                                    m.insert(n, json!(am));
393                                }
394                            }
395                        }
396                    }
397                }
398                Ok(xml_response("ModifyDBClusterParameterGroup", format!("    <DBClusterParameterGroupName>{}</DBClusterParameterGroupName>", xml_escape(&name)), &rid))
399            }
400            "ResetDBClusterParameterGroup" => {
401                let name = get_param(req, "DBClusterParameterGroupName").ok_or_else(|| missing("DBClusterParameterGroupName"))?;
402                let reset_all = get_param(req, "ResetAllParameters")
403                    .map(|v| v.eq_ignore_ascii_case("true"))
404                    .unwrap_or(false);
405                let named: Vec<String> = crate::service::parse_db_parameter_members(req)
406                    .into_iter()
407                    .map(|p| p.name)
408                    .collect();
409                {
410                    let mut accounts = write_state!();
411                    let state = accounts.get_or_create(&aid);
412                    if let Some(entry) = state
413                        .extras
414                        .get_mut("cluster_param_groups")
415                        .and_then(|m| m.get_mut(&name))
416                        .and_then(|e| e.as_object_mut())
417                    {
418                        for key in ["Parameters", "ParameterApplyMethods"] {
419                            if let Some(obj) = entry.get_mut(key).and_then(|p| p.as_object_mut()) {
420                                if reset_all || named.is_empty() {
421                                    obj.clear();
422                                } else {
423                                    for n in &named {
424                                        obj.remove(n);
425                                    }
426                                }
427                            }
428                        }
429                    }
430                }
431                Ok(xml_response("ResetDBClusterParameterGroup", format!("    <DBClusterParameterGroupName>{}</DBClusterParameterGroupName>", xml_escape(&name)), &rid))
432            }
433            "DeleteDBClusterParameterGroup" => {
434                let name = get_param(req, "DBClusterParameterGroupName").ok_or_else(|| missing("DBClusterParameterGroupName"))?;
435                let mut accounts = write_state!();
436                let state = accounts.get_or_create(&aid);
437                if let Some(m) = state.extras.get_mut("cluster_param_groups") { m.remove(&name); }
438                xml_empty_action(&action, &rid)
439            }
440            "DescribeDBClusterParameterGroups" => {
441                // RDS query lists wrap each element in its named member tag
442                // (`<DBClusterParameterGroup>`), not the generic `<member>`;
443                // the AWS SDK unmarshaler returns an empty list otherwise.
444                // AWS also filters by name and raises NotFound for an unknown
445                // group rather than returning everything.
446                let wanted = get_param(req, "DBClusterParameterGroupName");
447                let accounts = self.state_handle().read();
448                let groups: Vec<Value> = accounts
449                    .get(&aid)
450                    .and_then(|s| s.extras.get("cluster_param_groups"))
451                    .map(|m| m.values().cloned().collect())
452                    .unwrap_or_default();
453                if let Some(name) = &wanted {
454                    let found = groups.iter().any(|g| {
455                        g["DBClusterParameterGroupName"].as_str() == Some(name.as_str())
456                    });
457                    if !found {
458                        return Err(AwsServiceError::aws_error(
459                            StatusCode::NOT_FOUND,
460                            "DBParameterGroupNotFound",
461                            format!("DBClusterParameterGroup not found: {name}"),
462                        ));
463                    }
464                }
465                let members = groups
466                    .iter()
467                    .filter(|g| {
468                        wanted.as_deref().is_none_or(|n| {
469                            g["DBClusterParameterGroupName"].as_str() == Some(n)
470                        })
471                    })
472                    .map(|g| {
473                        format!(
474                            "        <DBClusterParameterGroup>\n{}\n        </DBClusterParameterGroup>",
475                            cluster_pg_member_xml(g)
476                        )
477                    })
478                    .collect::<Vec<_>>()
479                    .join("\n");
480                Ok(xml_response(
481                    "DescribeDBClusterParameterGroups",
482                    format!("    <DBClusterParameterGroups>\n{members}\n    </DBClusterParameterGroups>"),
483                    &rid,
484                ))
485            }
486            "DescribeDBClusterParameters" => {
487                let name = get_param(req, "DBClusterParameterGroupName").ok_or_else(|| missing("DBClusterParameterGroupName"))?;
488                let source_filter = get_param(req, "Source");
489                let source = source_filter.as_deref();
490                let include_user = source.is_none_or(|s| s == "user");
491                let include_engine_default = source.is_none_or(|s| s == "engine-default");
492                let accounts = self.state_handle().read();
493                let state = accounts.get(&aid);
494                let entry = state
495                    .and_then(|s| s.extras.get("cluster_param_groups"))
496                    .and_then(|m| m.get(&name));
497                let family = entry
498                    .and_then(|e| e.get("DBParameterGroupFamily"))
499                    .and_then(|f| f.as_str())
500                    .unwrap_or("aurora-postgresql15")
501                    .to_string();
502                let user_params: BTreeMap<String, String> = entry
503                    .and_then(|e| e.get("Parameters"))
504                    .and_then(|p| p.as_object())
505                    .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string())).collect())
506                    .unwrap_or_default();
507                let apply_methods: BTreeMap<String, String> = entry
508                    .and_then(|e| e.get("ParameterApplyMethods"))
509                    .and_then(|p| p.as_object())
510                    .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string())).collect())
511                    .unwrap_or_default();
512                let mut members = String::new();
513                if include_user {
514                    for (n, v) in &user_params {
515                        let apply_method = apply_methods.get(n).map(String::as_str).unwrap_or("immediate");
516                        members.push_str(&crate::service::render_user_parameter_xml(n, v, apply_method));
517                    }
518                }
519                if include_engine_default {
520                    // A user override flips a parameter's effective
521                    // source from `engine-default` to `user`, so we
522                    // always hide modified parameters from engine-default
523                    // views — even if the caller filtered to that source.
524                    for default in crate::state::engine_default_parameters(&family) {
525                        if user_params.contains_key(default.name) {
526                            continue;
527                        }
528                        members.push_str(&crate::service::render_engine_default_parameter_xml(default));
529                    }
530                }
531                Ok(xml_response("DescribeDBClusterParameters", format!("    <Parameters>\n{members}    </Parameters>"), &rid))
532            }
533            "DescribeEngineDefaultClusterParameters" => {
534                let family = get_param(req, "DBParameterGroupFamily").unwrap_or_else(|| "aurora-postgresql15".to_string());
535                let mut members = String::new();
536                for default in crate::state::engine_default_parameters(&family) {
537                    members.push_str(&crate::service::render_engine_default_parameter_xml(default));
538                }
539                let body = format!(
540                    "    <EngineDefaults>\n      <DBParameterGroupFamily>{}</DBParameterGroupFamily>\n      <Parameters>\n{}      </Parameters>\n    </EngineDefaults>",
541                    xml_escape(&family),
542                    members,
543                );
544                Ok(xml_response("DescribeEngineDefaultClusterParameters", body, &rid))
545            }
546
547            // ── DB Cluster endpoints ──
548            "CreateDBClusterEndpoint" => {
549                let id = get_param(req, "DBClusterEndpointIdentifier").ok_or_else(|| missing("DBClusterEndpointIdentifier"))?;
550                let cluster = get_param(req, "DBClusterIdentifier").unwrap_or_default();
551                let kind = get_param(req, "EndpointType").unwrap_or_else(|| "READER".to_string());
552                let entry = json!({"DBClusterEndpointIdentifier": id, "DBClusterIdentifier": cluster, "Endpoint": format!("{id}.cluster-custom.{region}.rds.amazonaws.com"), "EndpointType": kind, "Status": "available"});
553                let mut accounts = write_state!();
554                let state = accounts.get_or_create(&aid);
555                store(&mut state.extras, "cluster_endpoints").insert(id.clone(), entry.clone());
556                Ok(xml_response("CreateDBClusterEndpoint", cluster_endpoint_xml(&entry), &rid))
557            }
558            "ModifyDBClusterEndpoint" => {
559                let id = get_param(req, "DBClusterEndpointIdentifier").ok_or_else(|| missing("DBClusterEndpointIdentifier"))?;
560                let static_members = parse_member_list(req, "StaticMembers");
561                let excluded_members = parse_member_list(req, "ExcludedMembers");
562                let mut accounts = write_state!();
563                let state = accounts.get_or_create(&aid);
564                let entry = state
565                    .extras
566                    .get_mut("cluster_endpoints")
567                    .and_then(|m| m.get_mut(&id))
568                    .ok_or_else(|| {
569                        AwsServiceError::aws_error(
570                            StatusCode::NOT_FOUND,
571                            "DBClusterEndpointNotFoundFault",
572                            format!("DBClusterEndpoint {id} not found."),
573                        )
574                    })?;
575                if let Some(obj) = entry.as_object_mut() {
576                    if let Some(kind) = get_param(req, "EndpointType") {
577                        obj.insert("EndpointType".to_string(), json!(kind));
578                    }
579                    if !static_members.is_empty() {
580                        obj.insert("StaticMembers".to_string(), json!(static_members));
581                    }
582                    if !excluded_members.is_empty() {
583                        obj.insert("ExcludedMembers".to_string(), json!(excluded_members));
584                    }
585                }
586                let updated = entry.clone();
587                Ok(xml_response("ModifyDBClusterEndpoint", cluster_endpoint_xml(&updated), &rid))
588            }
589            "DeleteDBClusterEndpoint" => {
590                let id = get_param(req, "DBClusterEndpointIdentifier").ok_or_else(|| missing("DBClusterEndpointIdentifier"))?;
591                let mut accounts = write_state!();
592                let state = accounts.get_or_create(&aid);
593                if let Some(m) = state.extras.get_mut("cluster_endpoints") { m.remove(&id); }
594                Ok(xml_response("DeleteDBClusterEndpoint", format!("    <DBClusterEndpointIdentifier>{}</DBClusterEndpointIdentifier>", xml_escape(&id)), &rid))
595            }
596            "DescribeDBClusterEndpoints" => list_extras_xml(self, &aid, "cluster_endpoints", "DBClusterEndpoints", "DescribeDBClusterEndpoints", cluster_endpoint_xml, &rid),
597
598            // ── DB Proxies ──
599            "CreateDBProxy" => {
600                let name = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
601                let arn = Arn::new("rds", region, &aid, &format!("db-proxy:{name}")).to_string();
602                let entry = json!({"DBProxyName": name, "DBProxyArn": arn, "Status": "available", "EngineFamily": get_param(req, "EngineFamily").unwrap_or_else(|| "POSTGRESQL".to_string())});
603                let mut accounts = write_state!();
604                let state = accounts.get_or_create(&aid);
605                store(&mut state.extras, "proxies").insert(name.clone(), entry.clone());
606                Ok(xml_response("CreateDBProxy", proxy_xml(&entry), &rid))
607            }
608            "ModifyDBProxy" => {
609                let name = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
610                let auth = parse_proxy_auth(req);
611                let new_name = get_param(req, "NewDBProxyName");
612                let mut accounts = write_state!();
613                let state = accounts.get_or_create(&aid);
614                let entry = state
615                    .extras
616                    .get_mut("proxies")
617                    .and_then(|m| m.get_mut(&name))
618                    .ok_or_else(|| {
619                        AwsServiceError::aws_error(
620                            StatusCode::NOT_FOUND,
621                            "DBProxyNotFoundFault",
622                            format!("DBProxy {name} not found."),
623                        )
624                    })?;
625                if let Some(obj) = entry.as_object_mut() {
626                    if !auth.is_empty() {
627                        obj.insert("Auth".to_string(), json!(auth));
628                    }
629                    if let Some(v) = get_param(req, "RequireTLS") {
630                        obj.insert("RequireTLS".to_string(), json!(v.eq_ignore_ascii_case("true")));
631                    }
632                    if let Some(v) = get_param(req, "IdleClientTimeout").and_then(|s| s.parse::<i64>().ok()) {
633                        obj.insert("IdleClientTimeout".to_string(), json!(v));
634                    }
635                    if let Some(v) = get_param(req, "DebugLogging") {
636                        obj.insert("DebugLogging".to_string(), json!(v.eq_ignore_ascii_case("true")));
637                    }
638                    if let Some(v) = new_name.as_ref() {
639                        obj.insert("DBProxyName".to_string(), json!(v));
640                    }
641                }
642                let updated = entry.clone();
643                // Rekey the map so subsequent Describe/Delete/Modify
644                // against NewDBProxyName actually find the entry —
645                // otherwise the rename only mutates the payload field
646                // and Describe still keys by the old name.
647                if let Some(new) = new_name {
648                    if new != name {
649                        if let Some(m) = state.extras.get_mut("proxies") {
650                            if let Some(val) = m.remove(&name) {
651                                m.insert(new.clone(), val);
652                            }
653                        }
654                        // proxy_target_groups is keyed `<proxy>/<group>` —
655                        // rekey every entry that belongs to this proxy so
656                        // filtered describes by the new name keep matching.
657                        if let Some(m) = state.extras.get_mut("proxy_target_groups") {
658                            let old_prefix = format!("{name}/");
659                            let migrations: Vec<(String, String)> = m
660                                .keys()
661                                .filter(|k| k.starts_with(&old_prefix))
662                                .map(|k| {
663                                    let suffix = &k[old_prefix.len()..];
664                                    (k.clone(), format!("{new}/{suffix}"))
665                                })
666                                .collect();
667                            for (old_k, new_k) in migrations {
668                                if let Some(mut val) = m.remove(&old_k) {
669                                    if let Some(obj) = val.as_object_mut() {
670                                        obj.insert("DBProxyName".to_string(), json!(new));
671                                    }
672                                    m.insert(new_k, val);
673                                }
674                            }
675                        }
676                    }
677                }
678                Ok(xml_response("ModifyDBProxy", format!("    <DBProxy>\n{}\n    </DBProxy>", proxy_xml(&updated)), &rid))
679            }
680            "DeleteDBProxy" => {
681                let name = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
682                let mut accounts = write_state!();
683                let state = accounts.get_or_create(&aid);
684                if let Some(m) = state.extras.get_mut("proxies") { m.remove(&name); }
685                Ok(xml_response("DeleteDBProxy", "    <DBProxy/>".to_string(), &rid))
686            }
687            "DescribeDBProxies" => list_extras_xml(self, &aid, "proxies", "DBProxies", "DescribeDBProxies", proxy_xml, &rid),
688            "CreateDBProxyEndpoint" => {
689                let name = get_param(req, "DBProxyEndpointName").ok_or_else(|| missing("DBProxyEndpointName"))?;
690                let entry = json!({"DBProxyEndpointName": name, "Status": "available"});
691                let mut accounts = write_state!();
692                let state = accounts.get_or_create(&aid);
693                store(&mut state.extras, "proxy_endpoints").insert(name.clone(), entry);
694                Ok(xml_response("CreateDBProxyEndpoint", format!("    <DBProxyEndpoint>\n      <DBProxyEndpointName>{}</DBProxyEndpointName>\n    </DBProxyEndpoint>", xml_escape(&name)), &rid))
695            }
696            "ModifyDBProxyEndpoint" => {
697                let name = get_param(req, "DBProxyEndpointName").ok_or_else(|| missing("DBProxyEndpointName"))?;
698                let vpc_sgs = parse_member_list(req, "VpcSecurityGroupIds");
699                let new_name = get_param(req, "NewDBProxyEndpointName");
700                let mut accounts = write_state!();
701                let state = accounts.get_or_create(&aid);
702                let entry = state
703                    .extras
704                    .get_mut("proxy_endpoints")
705                    .and_then(|m| m.get_mut(&name))
706                    .ok_or_else(|| {
707                        AwsServiceError::aws_error(
708                            StatusCode::NOT_FOUND,
709                            "DBProxyEndpointNotFoundFault",
710                            format!("DBProxyEndpoint {name} not found."),
711                        )
712                    })?;
713                if let Some(obj) = entry.as_object_mut() {
714                    if !vpc_sgs.is_empty() {
715                        obj.insert("VpcSecurityGroupIds".to_string(), json!(vpc_sgs));
716                    }
717                    if let Some(v) = new_name.as_ref() {
718                        obj.insert("DBProxyEndpointName".to_string(), json!(v));
719                    }
720                }
721                let final_name = new_name.clone().unwrap_or_else(|| name.clone());
722                // Rekey so the rename is visible to subsequent lookups,
723                // not just to the payload field.
724                if let Some(new) = new_name {
725                    if new != name {
726                        if let Some(m) = state.extras.get_mut("proxy_endpoints") {
727                            if let Some(val) = m.remove(&name) {
728                                m.insert(new, val);
729                            }
730                        }
731                    }
732                }
733                Ok(xml_response("ModifyDBProxyEndpoint", format!("    <DBProxyEndpoint>\n      <DBProxyEndpointName>{}</DBProxyEndpointName>\n    </DBProxyEndpoint>", xml_escape(&final_name)), &rid))
734            }
735            "DeleteDBProxyEndpoint" => {
736                let name = get_param(req, "DBProxyEndpointName").ok_or_else(|| missing("DBProxyEndpointName"))?;
737                let mut accounts = write_state!();
738                let state = accounts.get_or_create(&aid);
739                if let Some(m) = state.extras.get_mut("proxy_endpoints") { m.remove(&name); }
740                Ok(xml_response("DeleteDBProxyEndpoint", "    <DBProxyEndpoint/>".to_string(), &rid))
741            }
742            "DescribeDBProxyEndpoints" => {
743                let accounts = self.state.read();
744                let state_opt = accounts.get(&aid);
745                let mut members = String::new();
746                if let Some(state) = state_opt {
747                    if let Some(m) = state.extras.get("proxy_endpoints") {
748                        for v in m.values() {
749                            // Render with the same field shape as
750                            // CreateDBProxyEndpoint above so consumers
751                            // see the persisted endpoint name and any
752                            // VpcSecurityGroupIds we recorded.
753                            let n = v
754                                .get("DBProxyEndpointName")
755                                .and_then(|x| x.as_str())
756                                .unwrap_or_default();
757                            members.push_str(&format!(
758                                "      <member>\n        <DBProxyEndpointName>{}</DBProxyEndpointName>\n      </member>\n",
759                                xml_escape(n)
760                            ));
761                        }
762                    }
763                }
764                Ok(xml_response("DescribeDBProxyEndpoints", format!("    <DBProxyEndpoints>\n{members}    </DBProxyEndpoints>"), &rid))
765            }
766            "DescribeDBProxyTargetGroups" => {
767                let accounts = self.state.read();
768                let state_opt = accounts.get(&aid);
769                let filter_proxy = get_param(req, "DBProxyName");
770                let mut members = String::new();
771                if let Some(state) = state_opt {
772                    if let Some(m) = state.extras.get("proxy_target_groups") {
773                        for v in m.values() {
774                            let proxy = v
775                                .get("DBProxyName")
776                                .and_then(|x| x.as_str())
777                                .unwrap_or_default();
778                            if let Some(want) = filter_proxy.as_deref() {
779                                if proxy != want {
780                                    continue;
781                                }
782                            }
783                            let tgn = v
784                                .get("TargetGroupName")
785                                .and_then(|x| x.as_str())
786                                .unwrap_or_default();
787                            members.push_str(&format!(
788                                "      <member>\n        <DBProxyName>{}</DBProxyName>\n        <TargetGroupName>{}</TargetGroupName>\n      </member>\n",
789                                xml_escape(proxy), xml_escape(tgn)
790                            ));
791                        }
792                    }
793                }
794                Ok(xml_response("DescribeDBProxyTargetGroups", format!("    <TargetGroups>\n{members}    </TargetGroups>"), &rid))
795            }
796            "DescribeDBProxyTargets" => {
797                let proxy = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
798                let group = get_param(req, "TargetGroupName").unwrap_or_else(|| "default".to_string());
799                let key = format!("{proxy}/{group}");
800                let accounts = self.state_handle().read();
801                let targets: Vec<Value> = accounts
802                    .get(&aid)
803                    .and_then(|s| s.extras.get("proxy_targets"))
804                    .and_then(|m| m.get(&key))
805                    .and_then(|v| v.as_array())
806                    .cloned()
807                    .unwrap_or_default();
808                let members: String = targets.iter().map(db_proxy_target_xml).collect();
809                Ok(xml_response("DescribeDBProxyTargets", format!("    <Targets>{members}</Targets>"), &rid))
810            }
811            "ModifyDBProxyTargetGroup" => {
812                let proxy = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
813                let group = get_param(req, "TargetGroupName").unwrap_or_else(|| "default".to_string());
814                let key = format!("{proxy}/{group}");
815                let mut pool = serde_json::Map::new();
816                if let Some(v) = get_param(req, "ConnectionPoolConfig.MaxConnectionsPercent").and_then(|s| s.parse::<i64>().ok()) {
817                    pool.insert("MaxConnectionsPercent".to_string(), json!(v));
818                }
819                if let Some(v) = get_param(req, "ConnectionPoolConfig.MaxIdleConnectionsPercent").and_then(|s| s.parse::<i64>().ok()) {
820                    pool.insert("MaxIdleConnectionsPercent".to_string(), json!(v));
821                }
822                if let Some(v) = get_param(req, "ConnectionPoolConfig.ConnectionBorrowTimeout").and_then(|s| s.parse::<i64>().ok()) {
823                    pool.insert("ConnectionBorrowTimeout".to_string(), json!(v));
824                }
825                if let Some(v) = get_param(req, "ConnectionPoolConfig.SessionPinningFilters") {
826                    pool.insert("SessionPinningFilters".to_string(), json!(v));
827                }
828                if let Some(v) = get_param(req, "ConnectionPoolConfig.InitQuery") {
829                    pool.insert("InitQuery".to_string(), json!(v));
830                }
831                let entry = json!({
832                    "DBProxyName": proxy,
833                    "TargetGroupName": group,
834                    "ConnectionPoolConfig": Value::Object(pool),
835                });
836                let mut accounts = write_state!();
837                let state = accounts.get_or_create(&aid);
838                store(&mut state.extras, "proxy_target_groups").insert(key, entry.clone());
839                Ok(xml_response("ModifyDBProxyTargetGroup", format!("    <DBProxyTargetGroup>\n      <DBProxyName>{}</DBProxyName>\n      <TargetGroupName>{}</TargetGroupName>\n    </DBProxyTargetGroup>", xml_escape(&proxy), xml_escape(&group)), &rid))
840            }
841            "RegisterDBProxyTargets" => {
842                let proxy = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
843                let group = get_param(req, "TargetGroupName").unwrap_or_else(|| "default".to_string());
844                let key = format!("{proxy}/{group}");
845                let instances = parse_member_list(req, "DBInstanceIdentifiers");
846                let clusters = parse_member_list(req, "DBClusterIdentifiers");
847                let new_targets: Vec<Value> = instances
848                    .iter()
849                    .map(|id| json!({"RdsResourceId": id, "Type": "RDS_INSTANCE", "Port": 3306, "Endpoint": format!("{id}.{region}.rds.amazonaws.com")}))
850                    .chain(clusters.iter().map(|id| {
851                        json!({"RdsResourceId": id, "Type": "TRACKED_CLUSTER", "Port": 3306, "Endpoint": format!("{id}.cluster-{region}.rds.amazonaws.com")})
852                    }))
853                    .collect();
854                {
855                    let mut accounts = write_state!();
856                    let state = accounts.get_or_create(&aid);
857                    let map = store(&mut state.extras, "proxy_targets");
858                    let existing = map.entry(key).or_insert_with(|| json!([]));
859                    if let Some(arr) = existing.as_array_mut() {
860                        for t in &new_targets {
861                            let rid_val = t["RdsResourceId"].as_str();
862                            arr.retain(|e| e["RdsResourceId"].as_str() != rid_val);
863                            arr.push(t.clone());
864                        }
865                    }
866                }
867                let members: String = new_targets.iter().map(db_proxy_target_xml).collect();
868                Ok(xml_response("RegisterDBProxyTargets", format!("    <DBProxyTargets>{members}</DBProxyTargets>"), &rid))
869            }
870            "DeregisterDBProxyTargets" => {
871                let proxy = get_param(req, "DBProxyName").ok_or_else(|| missing("DBProxyName"))?;
872                let group = get_param(req, "TargetGroupName").unwrap_or_else(|| "default".to_string());
873                let key = format!("{proxy}/{group}");
874                let remove: Vec<String> = parse_member_list(req, "DBInstanceIdentifiers")
875                    .into_iter()
876                    .chain(parse_member_list(req, "DBClusterIdentifiers"))
877                    .collect();
878                {
879                    let mut accounts = write_state!();
880                    let state = accounts.get_or_create(&aid);
881                    if let Some(arr) = state
882                        .extras
883                        .get_mut("proxy_targets")
884                        .and_then(|m| m.get_mut(&key))
885                        .and_then(|v| v.as_array_mut())
886                    {
887                        arr.retain(|e| {
888                            e["RdsResourceId"]
889                                .as_str()
890                                .map(|r| !remove.iter().any(|x| x == r))
891                                .unwrap_or(true)
892                        });
893                    }
894                }
895                xml_empty_action(&action, &rid)
896            }
897
898            // ── Security groups (legacy) ──
899            "CreateDBSecurityGroup" | "AuthorizeDBSecurityGroupIngress" | "RevokeDBSecurityGroupIngress" => {
900                let name = get_param(req, "DBSecurityGroupName").ok_or_else(|| missing("DBSecurityGroupName"))?;
901                let entry = json!({"DBSecurityGroupName": name, "DBSecurityGroupDescription": get_param(req, "DBSecurityGroupDescription").unwrap_or_default(), "OwnerId": aid.clone()});
902                let mut accounts = write_state!();
903                let state = accounts.get_or_create(&aid);
904                store(&mut state.extras, "security_groups").insert(name.clone(), entry.clone());
905                Ok(xml_response(action.as_str(), security_group_xml(&entry), &rid))
906            }
907            "DeleteDBSecurityGroup" => {
908                let name = get_param(req, "DBSecurityGroupName").ok_or_else(|| missing("DBSecurityGroupName"))?;
909                let mut accounts = write_state!();
910                let state = accounts.get_or_create(&aid);
911                if let Some(m) = state.extras.get_mut("security_groups") { m.remove(&name); }
912                xml_empty_action(&action, &rid)
913            }
914            "DescribeDBSecurityGroups" => list_extras_xml(self, &aid, "security_groups", "DBSecurityGroups", "DescribeDBSecurityGroups", security_group_xml, &rid),
915
916            // ── Option groups ──
917            "CreateOptionGroup" | "CopyOptionGroup" => {
918                let name = get_param(req, "OptionGroupName").or_else(|| get_param(req, "TargetOptionGroupIdentifier"))
919                    .ok_or_else(|| missing("OptionGroupName"))?;
920                let arn = Arn::new("rds", region, &aid, &format!("og:{name}")).to_string();
921                let entry = json!({"OptionGroupName": name, "OptionGroupArn": arn, "EngineName": get_param(req, "EngineName").unwrap_or_else(|| "mysql".to_string()), "MajorEngineVersion": get_param(req, "MajorEngineVersion").unwrap_or_else(|| "8.0".to_string()), "OptionGroupDescription": get_param(req, "OptionGroupDescription").unwrap_or_default()});
922                let mut accounts = write_state!();
923                let state = accounts.get_or_create(&aid);
924                store(&mut state.extras, "option_groups").insert(name.clone(), entry.clone());
925                Ok(xml_response(action.as_str(), option_group_xml(&entry), &rid))
926            }
927            "ModifyOptionGroup" => {
928                let name = get_param(req, "OptionGroupName").ok_or_else(|| missing("OptionGroupName"))?;
929                let to_include = parse_options_to_include(req);
930                let to_remove = parse_member_list(req, "OptionsToRemove");
931                let mut accounts = write_state!();
932                let state = accounts.get_or_create(&aid);
933                let entry = state
934                    .extras
935                    .get_mut("option_groups")
936                    .and_then(|m| m.get_mut(&name))
937                    .ok_or_else(|| {
938                        AwsServiceError::aws_error(
939                            StatusCode::NOT_FOUND,
940                            "OptionGroupNotFoundFault",
941                            format!("OptionGroup {name} not found."),
942                        )
943                    })?;
944                if let Some(obj) = entry.as_object_mut() {
945                    // Maintain the effective Options list so DescribeOptionGroups
946                    // reflects what was added/removed: upsert each included option
947                    // by OptionName, then drop any names in OptionsToRemove.
948                    let mut options = obj
949                        .get("Options")
950                        .and_then(|o| o.as_array())
951                        .cloned()
952                        .unwrap_or_default();
953                    for inc in &to_include {
954                        let name = inc["OptionName"].as_str().unwrap_or_default().to_string();
955                        options.retain(|o| o["OptionName"].as_str() != Some(name.as_str()));
956                        options.push(inc.clone());
957                    }
958                    if !to_remove.is_empty() {
959                        options.retain(|o| {
960                            o["OptionName"]
961                                .as_str()
962                                .map(|n| !to_remove.iter().any(|r| r == n))
963                                .unwrap_or(true)
964                        });
965                    }
966                    obj.insert("Options".to_string(), json!(options));
967                }
968                let updated = entry.clone();
969                Ok(xml_response("ModifyOptionGroup", format!("    <OptionGroup>\n{}\n    </OptionGroup>", option_group_xml(&updated)), &rid))
970            }
971            "DeleteOptionGroup" => {
972                let name = get_param(req, "OptionGroupName").ok_or_else(|| missing("OptionGroupName"))?;
973                let mut accounts = write_state!();
974                let state = accounts.get_or_create(&aid);
975                if let Some(m) = state.extras.get_mut("option_groups") { m.remove(&name); }
976                xml_empty_action(&action, &rid)
977            }
978            "DescribeOptionGroups" => {
979                // RDS wraps each list element in its named member tag
980                // (`<OptionGroup>`), not the generic `<member>`; the SDK
981                // unmarshals an empty list otherwise. AWS also filters by name
982                // and raises OptionGroupNotFoundFault for an unknown group.
983                let wanted = get_param(req, "OptionGroupName");
984                let accounts = self.state_handle().read();
985                let groups: Vec<Value> = accounts
986                    .get(&aid)
987                    .and_then(|s| s.extras.get("option_groups"))
988                    .map(|m| m.values().cloned().collect())
989                    .unwrap_or_default();
990                if let Some(name) = &wanted {
991                    let found = groups
992                        .iter()
993                        .any(|g| g["OptionGroupName"].as_str() == Some(name.as_str()));
994                    if !found {
995                        return Err(AwsServiceError::aws_error(
996                            StatusCode::NOT_FOUND,
997                            "OptionGroupNotFoundFault",
998                            format!("Specified OptionGroup: {name} not found."),
999                        ));
1000                    }
1001                }
1002                let members = groups
1003                    .iter()
1004                    .filter(|g| {
1005                        wanted
1006                            .as_deref()
1007                            .is_none_or(|n| g["OptionGroupName"].as_str() == Some(n))
1008                    })
1009                    .map(|g| {
1010                        format!(
1011                            "        <OptionGroup>\n{}\n        </OptionGroup>",
1012                            option_group_xml(g)
1013                        )
1014                    })
1015                    .collect::<Vec<_>>()
1016                    .join("\n");
1017                Ok(xml_response(
1018                    "DescribeOptionGroups",
1019                    format!("    <OptionGroupsList>\n{members}\n    </OptionGroupsList>"),
1020                    &rid,
1021                ))
1022            }
1023            "DescribeOptionGroupOptions" => Ok(xml_response("DescribeOptionGroupOptions", "    <OptionGroupOptions/>".to_string(), &rid)),
1024
1025            // ── Event subscriptions ──
1026            "CreateEventSubscription" => {
1027                let name = get_param(req, "SubscriptionName").ok_or_else(|| missing("SubscriptionName"))?;
1028                let arn = Arn::new("rds", region, &aid, &format!("es:{name}")).to_string();
1029                let source_ids = parse_member_list(req, "SourceIds");
1030                let event_categories = parse_member_list(req, "EventCategories");
1031                let entry = json!({"CustSubscriptionId": name, "CustomerAwsId": aid, "EventSubscriptionArn": arn, "SnsTopicArn": get_param(req, "SnsTopicArn").unwrap_or_default(), "SourceType": get_param(req, "SourceType").unwrap_or_default(), "Status": "active", "Enabled": true, "SourceIdsList": source_ids, "EventCategoriesList": event_categories});
1032                let mut accounts = write_state!();
1033                let state = accounts.get_or_create(&aid);
1034                store(&mut state.extras, "event_subscriptions").insert(name.clone(), entry.clone());
1035                Ok(xml_response("CreateEventSubscription", format!("    <EventSubscription>\n{}\n    </EventSubscription>", event_sub_xml(&entry)), &rid))
1036            }
1037            "ModifyEventSubscription" => {
1038                let name = get_param(req, "SubscriptionName").ok_or_else(|| missing("SubscriptionName"))?;
1039                let mut accounts = write_state!();
1040                let state = accounts.get_or_create(&aid);
1041                let entry = state
1042                    .extras
1043                    .get_mut("event_subscriptions")
1044                    .and_then(|m| m.get_mut(&name))
1045                    .ok_or_else(|| {
1046                        AwsServiceError::aws_error(
1047                            StatusCode::NOT_FOUND,
1048                            "SubscriptionNotFound",
1049                            format!("EventSubscription {name} not found."),
1050                        )
1051                    })?;
1052                if let Some(obj) = entry.as_object_mut() {
1053                    if let Some(v) = get_param(req, "SnsTopicArn") {
1054                        obj.insert("SnsTopicArn".to_string(), json!(v));
1055                    }
1056                    if let Some(v) = get_param(req, "SourceType") {
1057                        obj.insert("SourceType".to_string(), json!(v));
1058                    }
1059                    if let Some(v) = get_param(req, "Enabled") {
1060                        obj.insert("Enabled".to_string(), json!(v.eq_ignore_ascii_case("true")));
1061                    }
1062                }
1063                let updated = entry.clone();
1064                Ok(xml_response("ModifyEventSubscription", format!("    <EventSubscription>\n{}\n    </EventSubscription>", event_sub_xml(&updated)), &rid))
1065            }
1066            "DeleteEventSubscription" => {
1067                let name = get_param(req, "SubscriptionName").ok_or_else(|| missing("SubscriptionName"))?;
1068                let mut accounts = write_state!();
1069                let state = accounts.get_or_create(&aid);
1070                if let Some(m) = state.extras.get_mut("event_subscriptions") { m.remove(&name); }
1071                Ok(xml_response("DeleteEventSubscription", "    <EventSubscription/>".to_string(), &rid))
1072            }
1073            "DescribeEventSubscriptions" => {
1074                let wanted = get_param(req, "SubscriptionName");
1075                list_extras_named_xml(self, &aid, "event_subscriptions", "EventSubscriptionsList", "EventSubscription", "DescribeEventSubscriptions", event_sub_xml, wanted.as_deref(), "CustSubscriptionId", "SubscriptionNotFound", &rid)
1076            }
1077            "AddSourceIdentifierToSubscription" | "RemoveSourceIdentifierFromSubscription" => {
1078                let name = get_param(req, "SubscriptionName").ok_or_else(|| missing("SubscriptionName"))?;
1079                let source_id = get_param(req, "SourceIdentifier");
1080                let adding = action.as_str() == "AddSourceIdentifierToSubscription";
1081                let mut accounts = write_state!();
1082                let state = accounts.get_or_create(&aid);
1083                let entry = state
1084                    .extras
1085                    .get_mut("event_subscriptions")
1086                    .and_then(|m| m.get_mut(&name))
1087                    .ok_or_else(|| {
1088                        AwsServiceError::aws_error(
1089                            StatusCode::NOT_FOUND,
1090                            "SubscriptionNotFound",
1091                            format!("Subscription {name} not found."),
1092                        )
1093                    })?;
1094                if let (Some(obj), Some(sid)) = (entry.as_object_mut(), source_id) {
1095                    let list = obj
1096                        .entry("SourceIdsList".to_string())
1097                        .or_insert_with(|| json!([]));
1098                    if let Some(arr) = list.as_array_mut() {
1099                        arr.retain(|v| v.as_str() != Some(sid.as_str()));
1100                        if adding {
1101                            arr.push(json!(sid));
1102                        }
1103                    }
1104                }
1105                let updated = entry.clone();
1106                Ok(xml_response(action.as_str(), format!("    <EventSubscription>\n{}\n    </EventSubscription>", event_sub_xml(&updated)), &rid))
1107            }
1108
1109            // ── Global clusters ──
1110            "CreateGlobalCluster" => {
1111                let id = get_param(req, "GlobalClusterIdentifier").ok_or_else(|| missing("GlobalClusterIdentifier"))?;
1112                let arn = Arn::global("rds", &aid, &format!("global-cluster:{id}")).to_string();
1113                let entry = json!({
1114                    "GlobalClusterIdentifier": id,
1115                    "GlobalClusterArn": arn,
1116                    "GlobalClusterResourceId": new_cluster_resource_id(),
1117                    "Endpoint": format!("{id}.global.{region}.rds.amazonaws.com"),
1118                    "Status": "available",
1119                    "Engine": get_param(req, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
1120                    "EngineVersion": get_param(req, "EngineVersion").unwrap_or_else(|| "16.4".to_string()),
1121                    "EngineLifecycleSupport": get_param(req, "EngineLifecycleSupport").unwrap_or_else(|| "open-source-rds-extended-support".to_string()),
1122                    "DatabaseName": get_param(req, "DatabaseName").unwrap_or_default(),
1123                    "DeletionProtection": get_param(req, "DeletionProtection").map(|v| v.eq_ignore_ascii_case("true")).unwrap_or(false),
1124                    "StorageEncrypted": get_param(req, "StorageEncrypted").map(|v| v.eq_ignore_ascii_case("true")).unwrap_or(false),
1125                });
1126                let mut accounts = write_state!();
1127                let state = accounts.get_or_create(&aid);
1128                store(&mut state.extras, "global_clusters").insert(id.clone(), entry.clone());
1129                Ok(xml_response("CreateGlobalCluster", format!("    <GlobalCluster>\n{}\n    </GlobalCluster>", global_cluster_xml(&entry)), &rid))
1130            }
1131            "ModifyGlobalCluster" | "FailoverGlobalCluster" | "SwitchoverGlobalCluster" | "RemoveFromGlobalCluster" => {
1132                let id = get_param(req, "GlobalClusterIdentifier").ok_or_else(|| missing("GlobalClusterIdentifier"))?;
1133                let new_id = get_param(req, "NewGlobalClusterIdentifier");
1134                let deletion_protection = get_param(req, "DeletionProtection")
1135                    .map(|v| v.eq_ignore_ascii_case("true"));
1136                let engine_version = get_param(req, "EngineVersion");
1137                let updated = {
1138                    let mut accounts = write_state!();
1139                    let state = accounts.get_or_create(&aid);
1140                    let map = state
1141                        .extras
1142                        .get_mut("global_clusters")
1143                        .ok_or_else(|| {
1144                            AwsServiceError::aws_error(
1145                                StatusCode::NOT_FOUND,
1146                                "GlobalClusterNotFoundFault",
1147                                format!("{id} not found."),
1148                            )
1149                        })?;
1150                    let mut entry = map.get(&id).cloned().ok_or_else(|| {
1151                        AwsServiceError::aws_error(
1152                            StatusCode::NOT_FOUND,
1153                            "GlobalClusterNotFoundFault",
1154                            format!("{id} not found."),
1155                        )
1156                    })?;
1157                    if let Some(obj) = entry.as_object_mut() {
1158                        if action.as_str() == "ModifyGlobalCluster" {
1159                            if let Some(dp) = deletion_protection {
1160                                obj.insert("DeletionProtection".to_string(), json!(dp));
1161                            }
1162                            if let Some(ev) = &engine_version {
1163                                obj.insert("EngineVersion".to_string(), json!(ev));
1164                            }
1165                            if let Some(nid) = &new_id {
1166                                obj.insert("GlobalClusterIdentifier".to_string(), json!(nid));
1167                            }
1168                        }
1169                    }
1170                    // A rename re-keys the stored map entry.
1171                    if action.as_str() == "ModifyGlobalCluster" {
1172                        if let Some(nid) = &new_id {
1173                            map.remove(&id);
1174                            map.insert(nid.clone(), entry.clone());
1175                        } else {
1176                            map.insert(id.clone(), entry.clone());
1177                        }
1178                    }
1179                    entry
1180                };
1181                Ok(xml_response(action.as_str(), format!("    <GlobalCluster>\n{}\n    </GlobalCluster>", global_cluster_xml(&updated)), &rid))
1182            }
1183            "DeleteGlobalCluster" => {
1184                let id = get_param(req, "GlobalClusterIdentifier").ok_or_else(|| missing("GlobalClusterIdentifier"))?;
1185                let mut accounts = write_state!();
1186                let state = accounts.get_or_create(&aid);
1187                if let Some(m) = state.extras.get_mut("global_clusters") { m.remove(&id); }
1188                Ok(xml_response("DeleteGlobalCluster", "    <GlobalCluster/>".to_string(), &rid))
1189            }
1190            "DescribeGlobalClusters" => {
1191                let wanted = get_param(req, "GlobalClusterIdentifier");
1192                // RDS names this list's member `<GlobalClusterMember>`, not the
1193                // usual singular-of-wrapper (`<GlobalCluster>`); the SDK
1194                // unmarshals an empty list with any other tag.
1195                list_extras_named_xml(self, &aid, "global_clusters", "GlobalClusters", "GlobalClusterMember", "DescribeGlobalClusters", global_cluster_xml, wanted.as_deref(), "GlobalClusterIdentifier", "GlobalClusterNotFoundFault", &rid)
1196            }
1197
1198            // ── Integrations ──
1199            "CreateIntegration" => {
1200                let name = get_param(req, "IntegrationName").ok_or_else(|| missing("IntegrationName"))?;
1201                let arn = Arn::new("rds", region, &aid, &format!("integration:{name}")).to_string();
1202                let entry = json!({"IntegrationName": name, "IntegrationArn": arn, "Status": "active"});
1203                let mut accounts = write_state!();
1204                let state = accounts.get_or_create(&aid);
1205                store(&mut state.extras, "integrations").insert(name.clone(), entry.clone());
1206                Ok(xml_response("CreateIntegration", integration_xml(&entry), &rid))
1207            }
1208            "ModifyIntegration" => {
1209                let ident = get_param(req, "IntegrationIdentifier")
1210                    .or_else(|| get_param(req, "IntegrationName"))
1211                    .ok_or_else(|| missing("IntegrationIdentifier"))?;
1212                let data_filter = get_param(req, "DataFilter");
1213                let description = get_param(req, "Description");
1214                let new_name = get_param(req, "IntegrationName");
1215                let updated = {
1216                    let mut accounts = write_state!();
1217                    let state = accounts.get_or_create(&aid);
1218                    let map = state.extras.get_mut("integrations").ok_or_else(|| {
1219                        AwsServiceError::aws_error(
1220                            StatusCode::NOT_FOUND,
1221                            "IntegrationNotFoundFault",
1222                            format!("Integration {ident} not found."),
1223                        )
1224                    })?;
1225                    // The identifier may be the integration name or its ARN.
1226                    let key = map
1227                        .iter()
1228                        .find(|(k, v)| {
1229                            k.as_str() == ident || v["IntegrationArn"].as_str() == Some(ident.as_str())
1230                        })
1231                        .map(|(k, _)| k.clone())
1232                        .ok_or_else(|| {
1233                            AwsServiceError::aws_error(
1234                                StatusCode::NOT_FOUND,
1235                                "IntegrationNotFoundFault",
1236                                format!("Integration {ident} not found."),
1237                            )
1238                        })?;
1239                    let mut entry = map.get(&key).cloned().unwrap_or(json!({}));
1240                    if let Some(obj) = entry.as_object_mut() {
1241                        if let Some(v) = &data_filter {
1242                            obj.insert("DataFilter".to_string(), json!(v));
1243                        }
1244                        if let Some(v) = &description {
1245                            obj.insert("Description".to_string(), json!(v));
1246                        }
1247                        if let Some(v) = &new_name {
1248                            obj.insert("IntegrationName".to_string(), json!(v));
1249                        }
1250                    }
1251                    map.insert(key, entry.clone());
1252                    entry
1253                };
1254                Ok(xml_response("ModifyIntegration", format!("    <Integration>\n{}\n    </Integration>", integration_xml(&updated)), &rid))
1255            }
1256            "DeleteIntegration" => {
1257                let name = get_param(req, "IntegrationIdentifier").or_else(|| get_param(req, "IntegrationName")).ok_or_else(|| missing("IntegrationIdentifier"))?;
1258                let mut accounts = write_state!();
1259                let state = accounts.get_or_create(&aid);
1260                if let Some(m) = state.extras.get_mut("integrations") { m.remove(&name); }
1261                Ok(xml_response("DeleteIntegration", "    <Integration/>".to_string(), &rid))
1262            }
1263            "DescribeIntegrations" => list_extras_xml(self, &aid, "integrations", "Integrations", "DescribeIntegrations", integration_xml, &rid),
1264
1265            // ── Blue/Green deployments ──
1266            "CreateBlueGreenDeployment" => {
1267                let id = format!("bgd-{}", rand_id());
1268                let arn = Arn::new("rds", region, &aid, &format!("blue-green-deployment:{id}"))
1269                    .to_string();
1270                let source_arn = get_param(req, "Source")
1271                    .ok_or_else(|| missing("Source"))?;
1272                let source_id = source_arn
1273                    .rsplit(':')
1274                    .next()
1275                    .map(|s| s.to_string())
1276                    .unwrap_or_default();
1277                let target_id = get_param(req, "TargetDBInstanceName")
1278                    .unwrap_or_else(|| format!("{source_id}-green-{}", rand_id()));
1279                let mut accounts = write_state!();
1280                let state = accounts.get_or_create(&aid);
1281                let source_arn_full = if source_arn.starts_with("arn:") {
1282                    source_arn.clone()
1283                } else {
1284                    state.db_instance_arn(&source_id)
1285                };
1286                let target_arn = state.db_instance_arn(&target_id);
1287                // AWS accepts either a DBInstance ARN or an Aurora
1288                // DBCluster ARN as the BG source. Look up under both
1289                // the real instance store and the cluster map under
1290                // `state.extras["clusters"]`; absent both, surface
1291                // DBInstanceNotFound (matching what AWS emits for the
1292                // more common DBInstance source).
1293                let instance_exists = state.instances.contains_key(&source_id);
1294                let cluster_exists = state
1295                    .extras
1296                    .get("clusters")
1297                    .map(|m| m.contains_key(&source_id))
1298                    .unwrap_or(false);
1299                if !instance_exists && !cluster_exists {
1300                    return Err(AwsServiceError::aws_error(
1301                        StatusCode::NOT_FOUND,
1302                        "DBInstanceNotFound",
1303                        format!("DBInstance {source_id} not found."),
1304                    ));
1305                }
1306                // Cluster sources require their own provisioning path:
1307                // clone the source cluster entry under the green id and
1308                // record the cluster ARNs in the BG record so a later
1309                // SwitchoverBlueGreenDeployment can operate on something
1310                // real.
1311                let target_arn_for_record = if cluster_exists {
1312                    let source_cluster = state
1313                        .extras
1314                        .get("clusters")
1315                        .and_then(|m| m.get(&source_id))
1316                        .cloned();
1317                    if let Some(mut green_cluster) = source_cluster {
1318                        let green_arn =
1319                            Arn::new("rds", region, &aid, &format!("cluster:{target_id}"))
1320                                .to_string();
1321                        if let Some(obj) = green_cluster.as_object_mut() {
1322                            obj.insert(
1323                                "DBClusterIdentifier".to_string(),
1324                                json!(target_id.clone()),
1325                            );
1326                            obj.insert("DBClusterArn".to_string(), json!(green_arn.clone()));
1327                            obj.insert("Status".to_string(), json!("available"));
1328                        }
1329                        store(&mut state.extras, "clusters")
1330                            .insert(target_id.clone(), green_cluster);
1331                        green_arn
1332                    } else {
1333                        target_arn.clone()
1334                    }
1335                } else if let Some(source) = state.instances.get(&source_id).cloned() {
1336                    let mut green = source.clone();
1337                    green.db_instance_identifier = target_id.clone();
1338                    green.db_instance_arn = target_arn.clone();
1339                    green.read_replica_db_instance_identifiers = Vec::new();
1340                    green.read_replica_source_db_instance_identifier = Some(source_id.clone());
1341                    green.dbi_resource_id = format!("db-{}", uuid::Uuid::new_v4().simple());
1342                    state.instances.insert(target_id.clone(), green);
1343                    target_arn.clone()
1344                } else {
1345                    target_arn.clone()
1346                };
1347                let entry = json!({
1348                    "BlueGreenDeploymentIdentifier": id,
1349                    "BlueGreenDeploymentName": get_param(req, "BlueGreenDeploymentName").unwrap_or_else(|| "blue-green".to_string()),
1350                    "Status": "AVAILABLE",
1351                    "Source": source_arn_full,
1352                    "Target": target_arn_for_record,
1353                    "SourceDBInstanceIdentifier": source_id,
1354                    "TargetDBInstanceIdentifier": target_id,
1355                    "SourceIsCluster": cluster_exists && !instance_exists,
1356                    "BlueGreenDeploymentArn": arn,
1357                });
1358                store(&mut state.extras, "blue_green").insert(id.clone(), entry.clone());
1359                Ok(xml_response("CreateBlueGreenDeployment", blue_green_xml(&entry), &rid))
1360            }
1361            "SwitchoverBlueGreenDeployment" => {
1362                let id = get_param(req, "BlueGreenDeploymentIdentifier")
1363                    .ok_or_else(|| missing("BlueGreenDeploymentIdentifier"))?;
1364                let mut accounts = write_state!();
1365                let state = accounts.get_or_create(&aid);
1366                let entry = state
1367                    .extras
1368                    .get("blue_green")
1369                    .and_then(|m| m.get(&id))
1370                    .cloned()
1371                    .ok_or_else(|| {
1372                        AwsServiceError::aws_error(
1373                            StatusCode::NOT_FOUND,
1374                            "BlueGreenDeploymentNotFoundFault",
1375                            format!("BlueGreenDeployment {id} not found."),
1376                        )
1377                    })?;
1378                let source_id = entry["SourceDBInstanceIdentifier"]
1379                    .as_str()
1380                    .unwrap_or("")
1381                    .to_string();
1382                let target_id = entry["TargetDBInstanceIdentifier"]
1383                    .as_str()
1384                    .unwrap_or("")
1385                    .to_string();
1386                if !source_id.is_empty() && !target_id.is_empty() {
1387                    let blue = state.instances.get(&source_id).cloned();
1388                    let green = state.instances.get(&target_id).cloned();
1389                    if let (Some(mut b), Some(mut g)) = (blue, green) {
1390                        // Swap endpoints (and host_port) so callers
1391                        // pointing at the blue address now reach the
1392                        // green container, mirroring AWS BG cutover.
1393                        std::mem::swap(&mut b.endpoint_address, &mut g.endpoint_address);
1394                        std::mem::swap(&mut b.port, &mut g.port);
1395                        std::mem::swap(&mut b.host_port, &mut g.host_port);
1396                        std::mem::swap(&mut b.container_id, &mut g.container_id);
1397                        // Green is now the writer; clear its replica
1398                        // pointer back at the old blue.
1399                        g.read_replica_source_db_instance_identifier = None;
1400                        state.instances.insert(source_id.clone(), b);
1401                        state.instances.insert(target_id.clone(), g);
1402                    }
1403                }
1404                if let Some(map) = state.extras.get_mut("blue_green") {
1405                    if let Some(e) = map.get_mut(&id) {
1406                        if let Some(obj) = e.as_object_mut() {
1407                            obj.insert("Status".to_string(), json!("SWITCHOVER_COMPLETED"));
1408                        }
1409                    }
1410                }
1411                let updated = state
1412                    .extras
1413                    .get("blue_green")
1414                    .and_then(|m| m.get(&id))
1415                    .cloned()
1416                    .unwrap_or(entry);
1417                Ok(xml_response(
1418                    "SwitchoverBlueGreenDeployment",
1419                    blue_green_xml(&updated),
1420                    &rid,
1421                ))
1422            }
1423            "DeleteBlueGreenDeployment" => {
1424                let id = get_param(req, "BlueGreenDeploymentIdentifier")
1425                    .ok_or_else(|| missing("BlueGreenDeploymentIdentifier"))?;
1426                let delete_target = get_param(req, "DeleteTarget")
1427                    .map(|v| v.eq_ignore_ascii_case("true"))
1428                    .unwrap_or(false);
1429                let mut accounts = write_state!();
1430                let state = accounts.get_or_create(&aid);
1431                let entry = state
1432                    .extras
1433                    .get_mut("blue_green")
1434                    .and_then(|m| m.remove(&id))
1435                    .ok_or_else(|| {
1436                        AwsServiceError::aws_error(
1437                            StatusCode::NOT_FOUND,
1438                            "BlueGreenDeploymentNotFoundFault",
1439                            format!("BlueGreenDeployment {id} not found."),
1440                        )
1441                    })?;
1442                if delete_target {
1443                    if let Some(target_id) = entry["TargetDBInstanceIdentifier"].as_str() {
1444                        state.instances.remove(target_id);
1445                    }
1446                }
1447                Ok(xml_response(
1448                    "DeleteBlueGreenDeployment",
1449                    blue_green_xml(&entry),
1450                    &rid,
1451                ))
1452            }
1453            "DescribeBlueGreenDeployments" => list_extras_xml(self, &aid, "blue_green", "BlueGreenDeployments", "DescribeBlueGreenDeployments", blue_green_xml, &rid),
1454
1455            // ── Shard groups ──
1456            "CreateDBShardGroup" => {
1457                let id = get_param(req, "DBShardGroupIdentifier").ok_or_else(|| missing("DBShardGroupIdentifier"))?;
1458                let entry = json!({"DBShardGroupIdentifier": id, "Status": "available"});
1459                let mut accounts = write_state!();
1460                let state = accounts.get_or_create(&aid);
1461                store(&mut state.extras, "shard_groups").insert(id.clone(), entry.clone());
1462                Ok(xml_response("CreateDBShardGroup", shard_group_xml(&entry), &rid))
1463            }
1464            "ModifyDBShardGroup" | "RebootDBShardGroup" => Ok(xml_response(action.as_str(), "    <DBShardGroup/>".to_string(), &rid)),
1465            "DeleteDBShardGroup" => {
1466                let id = get_param(req, "DBShardGroupIdentifier").ok_or_else(|| missing("DBShardGroupIdentifier"))?;
1467                let mut accounts = write_state!();
1468                let state = accounts.get_or_create(&aid);
1469                if let Some(m) = state.extras.get_mut("shard_groups") { m.remove(&id); }
1470                Ok(xml_response("DeleteDBShardGroup", "    <DBShardGroup/>".to_string(), &rid))
1471            }
1472            "DescribeDBShardGroups" => list_extras_xml(self, &aid, "shard_groups", "DBShardGroups", "DescribeDBShardGroups", shard_group_xml, &rid),
1473
1474            // ── Custom engine versions ──
1475            "CreateCustomDBEngineVersion" | "ModifyCustomDBEngineVersion" => {
1476                let v = get_param(req, "EngineVersion").unwrap_or_else(|| "1.0".to_string());
1477                let engine = get_param(req, "Engine").unwrap_or_else(|| "custom-oracle-ee".to_string());
1478                let entry = json!({"Engine": engine, "EngineVersion": v, "Status": "available"});
1479                let mut accounts = write_state!();
1480                let state = accounts.get_or_create(&aid);
1481                store(&mut state.extras, "custom_engine_versions").insert(v.clone(), entry.clone());
1482                Ok(xml_response(action.as_str(), engine_version_xml(&entry), &rid))
1483            }
1484            "DeleteCustomDBEngineVersion" => Ok(xml_response("DeleteCustomDBEngineVersion", "    <DBEngineVersion/>".to_string(), &rid)),
1485
1486            // ── Tenant databases ──
1487            "CreateTenantDatabase" => {
1488                let name = get_param(req, "TenantDBName").ok_or_else(|| missing("TenantDBName"))?;
1489                let entry = json!({"TenantDBName": name, "Status": "available"});
1490                let mut accounts = write_state!();
1491                let state = accounts.get_or_create(&aid);
1492                store(&mut state.extras, "tenant_dbs").insert(name.clone(), entry.clone());
1493                Ok(xml_response("CreateTenantDatabase", tenant_db_xml(&entry), &rid))
1494            }
1495            "ModifyTenantDatabase" => {
1496                let _instance = get_param(req, "DBInstanceIdentifier").ok_or_else(|| missing("DBInstanceIdentifier"))?;
1497                let name = get_param(req, "TenantDBName").ok_or_else(|| missing("TenantDBName"))?;
1498                let new_name = get_param(req, "NewTenantDBName");
1499                let new_password = get_param(req, "MasterUserPassword");
1500                let mut accounts = write_state!();
1501                let state = accounts.get_or_create(&aid);
1502                let entry = state
1503                    .extras
1504                    .get_mut("tenant_dbs")
1505                    .and_then(|m| m.remove(&name))
1506                    .ok_or_else(|| {
1507                        AwsServiceError::aws_error(
1508                            StatusCode::NOT_FOUND,
1509                            "TenantDatabaseNotFound",
1510                            format!("TenantDatabase {name} not found."),
1511                        )
1512                    })?;
1513                let mut updated = entry;
1514                let final_name = new_name.clone().unwrap_or_else(|| name.clone());
1515                if let Some(obj) = updated.as_object_mut() {
1516                    obj.insert("TenantDBName".to_string(), json!(final_name));
1517                    if let Some(p) = new_password {
1518                        obj.insert("MasterUserPassword".to_string(), json!(p));
1519                    }
1520                }
1521                store(&mut state.extras, "tenant_dbs").insert(final_name, updated.clone());
1522                Ok(xml_response("ModifyTenantDatabase", format!("    <TenantDatabase>\n{}\n    </TenantDatabase>", tenant_db_xml(&updated)), &rid))
1523            }
1524            "DeleteTenantDatabase" => {
1525                let name = get_param(req, "TenantDBName").ok_or_else(|| missing("TenantDBName"))?;
1526                let mut accounts = write_state!();
1527                let state = accounts.get_or_create(&aid);
1528                if let Some(m) = state.extras.get_mut("tenant_dbs") { m.remove(&name); }
1529                Ok(xml_response("DeleteTenantDatabase", "    <TenantDatabase/>".to_string(), &rid))
1530            }
1531            "DescribeTenantDatabases" => list_extras_xml(self, &aid, "tenant_dbs", "TenantDatabases", "DescribeTenantDatabases", tenant_db_xml, &rid),
1532            "DescribeDBSnapshotTenantDatabases" => Ok(xml_response("DescribeDBSnapshotTenantDatabases", "    <DBSnapshotTenantDatabases/>".to_string(), &rid)),
1533
1534            // ── Export tasks ──
1535            "StartExportTask" => {
1536                let id = get_param(req, "ExportTaskIdentifier").ok_or_else(|| missing("ExportTaskIdentifier"))?;
1537                let entry = json!({"ExportTaskIdentifier": id, "Status": "STARTING"});
1538                let mut accounts = write_state!();
1539                let state = accounts.get_or_create(&aid);
1540                store(&mut state.extras, "export_tasks").insert(id.clone(), entry.clone());
1541                Ok(xml_response("StartExportTask", export_task_xml(&entry), &rid))
1542            }
1543            "CancelExportTask" => Ok(xml_response("CancelExportTask", "    <ExportTask/>".to_string(), &rid)),
1544            "DescribeExportTasks" => list_extras_xml(self, &aid, "export_tasks", "ExportTasks", "DescribeExportTasks", export_task_xml, &rid),
1545
1546            // ── Activity stream ──
1547            "StartActivityStream" => {
1548                let kms_input = get_param(req, "KmsKeyId").unwrap_or_default();
1549                let kms_arn = format_kms_arn(&kms_input, region, &aid);
1550                let mode = get_param(req, "Mode").unwrap_or_else(|| "async".to_string());
1551                let resource_arn = get_param(req, "ResourceArn").unwrap_or_default();
1552                let stream = if resource_arn.is_empty() {
1553                    "aws-rds-das".to_string()
1554                } else {
1555                    let id = resource_arn.rsplit(':').next().unwrap_or("default");
1556                    format!("aws-rds-das-{id}")
1557                };
1558                Ok(xml_response("StartActivityStream", format!("    <Status>started</Status>\n    <KmsKeyId>{}</KmsKeyId>\n    <KinesisStreamName>{}</KinesisStreamName>\n    <Mode>{}</Mode>\n    <ApplyImmediately>true</ApplyImmediately>", xml_escape(&kms_arn), xml_escape(&stream), xml_escape(&mode)), &rid))
1559            }
1560            "StopActivityStream" => Ok(xml_response("StopActivityStream", "    <Status>stopped</Status>".to_string(), &rid)),
1561            "ModifyActivityStream" => Ok(xml_response("ModifyActivityStream", "    <Status>started</Status>".to_string(), &rid)),
1562
1563            // ── Database read replicas ──
1564            "PromoteReadReplica" => promote_read_replica_action(self, &aid, req, &rid),
1565            "SwitchoverReadReplica" => switchover_read_replica_action(self, &aid, req, &rid),
1566            "StartDBInstanceAutomatedBackupsReplication" | "StopDBInstanceAutomatedBackupsReplication" => Ok(xml_response(action.as_str(), "    <DBInstanceAutomatedBackup/>".to_string(), &rid)),
1567            "DeleteDBInstanceAutomatedBackup" => Ok(xml_response("DeleteDBInstanceAutomatedBackup", "    <DBInstanceAutomatedBackup/>".to_string(), &rid)),
1568            "DescribeDBInstanceAutomatedBackups" => Ok(xml_response("DescribeDBInstanceAutomatedBackups", "    <DBInstanceAutomatedBackups/>".to_string(), &rid)),
1569
1570            // ── Roles ──
1571            "AddRoleToDBCluster" | "RemoveRoleFromDBCluster" | "AddRoleToDBInstance" | "RemoveRoleFromDBInstance" => xml_empty_action(&action, &rid),
1572
1573            // ── Pending maintenance ──
1574            "ApplyPendingMaintenanceAction" => {
1575                let resource = get_param(req, "ResourceIdentifier").ok_or_else(|| missing("ResourceIdentifier"))?;
1576                let _action_kind = get_param(req, "ApplyAction").ok_or_else(|| missing("ApplyAction"))?;
1577                let _opt_in = get_param(req, "OptInType").ok_or_else(|| missing("OptInType"))?;
1578                let (kind, id) = parse_rds_resource_arn(&resource);
1579                let mut accounts = write_state!();
1580                let state = accounts.get_or_create(&aid);
1581                match kind {
1582                    Some("db") => {
1583                        if let Some(inst) = state.instances.get_mut(&id) {
1584                            if let Some(pending) = inst.pending_modified_values.take() {
1585                                crate::service::apply_pending_to_instance(inst, pending);
1586                            }
1587                        }
1588                    }
1589                    Some("cluster") => {
1590                        if let Some(map) = state.extras.get_mut("clusters") {
1591                            if let Some(entry) = map.get_mut(&id) {
1592                                if let Some(obj) = entry.as_object_mut() {
1593                                    obj.remove("PendingModifiedValues");
1594                                }
1595                            }
1596                        }
1597                    }
1598                    _ => {}
1599                }
1600                Ok(xml_response("ApplyPendingMaintenanceAction", format!("    <ResourcePendingMaintenanceActions>\n      <ResourceIdentifier>{}</ResourceIdentifier>\n      <PendingMaintenanceActionDetails/>\n    </ResourcePendingMaintenanceActions>", xml_escape(&resource)), &rid))
1601            }
1602            "DescribePendingMaintenanceActions" => Ok(xml_response("DescribePendingMaintenanceActions", "    <PendingMaintenanceActions/>".to_string(), &rid)),
1603
1604            // ── Reserved instances ──
1605            "PurchaseReservedDBInstancesOffering" => Ok(xml_response("PurchaseReservedDBInstancesOffering", "    <ReservedDBInstance/>".to_string(), &rid)),
1606            "DescribeReservedDBInstances" => Ok(xml_response("DescribeReservedDBInstances", "    <ReservedDBInstances/>".to_string(), &rid)),
1607            "DescribeReservedDBInstancesOfferings" => Ok(xml_response("DescribeReservedDBInstancesOfferings", "    <ReservedDBInstancesOfferings/>".to_string(), &rid)),
1608
1609            // ── Snapshots / restores / copy ──
1610            "CopyDBSnapshot" => {
1611                let id = get_param(req, "TargetDBSnapshotIdentifier").ok_or_else(|| missing("TargetDBSnapshotIdentifier"))?;
1612                Ok(xml_response("CopyDBSnapshot", format!("    <DBSnapshot>\n      <DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\n      <Status>available</Status>\n    </DBSnapshot>", xml_escape(&id)), &rid))
1613            }
1614            "CopyDBParameterGroup" => {
1615                let name = get_param(req, "TargetDBParameterGroupIdentifier").ok_or_else(|| missing("TargetDBParameterGroupIdentifier"))?;
1616                Ok(xml_response("CopyDBParameterGroup", format!("    <DBParameterGroup>\n      <DBParameterGroupName>{}</DBParameterGroupName>\n    </DBParameterGroup>", xml_escape(&name)), &rid))
1617            }
1618            "DescribeDBParameters" => Ok(xml_response("DescribeDBParameters", "    <Parameters/>".to_string(), &rid)),
1619            "ResetDBParameterGroup" => {
1620                let name = get_param(req, "DBParameterGroupName").ok_or_else(|| missing("DBParameterGroupName"))?;
1621                // Resetting a parameter flips its source back from `user` to
1622                // `engine-default`, so we drop it from the user-set map. With
1623                // `ResetAllParameters=true` (or no explicit list) every user
1624                // value is cleared; otherwise only the named parameters are.
1625                let reset_all = get_param(req, "ResetAllParameters")
1626                    .map(|v| v.eq_ignore_ascii_case("true"))
1627                    .unwrap_or(false);
1628                let named: Vec<String> = crate::service::parse_db_parameter_members(req)
1629                    .into_iter()
1630                    .map(|p| p.name)
1631                    .collect();
1632                {
1633                    let mut accounts = write_state!();
1634                    let state = accounts.get_or_create(&aid);
1635                    if let Some(group) = state.parameter_groups.get_mut(&name) {
1636                        if reset_all || named.is_empty() {
1637                            group.parameters.clear();
1638                            group.parameter_apply_methods.clear();
1639                        } else {
1640                            for n in &named {
1641                                group.parameters.remove(n);
1642                                group.parameter_apply_methods.remove(n);
1643                            }
1644                        }
1645                    }
1646                }
1647                Ok(xml_response("ResetDBParameterGroup", format!("    <DBParameterGroupName>{}</DBParameterGroupName>", xml_escape(&name)), &rid))
1648            }
1649            "DescribeEngineDefaultParameters" => {
1650                let family = get_param(req, "DBParameterGroupFamily").unwrap_or_else(|| "postgres16".to_string());
1651                let mut members = String::new();
1652                for default in crate::state::engine_default_parameters(&family) {
1653                    members.push_str(&crate::service::render_engine_default_parameter_xml(default));
1654                }
1655                let body = format!(
1656                    "    <EngineDefaults>\n      <DBParameterGroupFamily>{}</DBParameterGroupFamily>\n      <Parameters>\n{}      </Parameters>\n    </EngineDefaults>",
1657                    xml_escape(&family),
1658                    members,
1659                );
1660                Ok(xml_response("DescribeEngineDefaultParameters", body, &rid))
1661            }
1662            "DescribeDBSnapshotAttributes" => Ok(xml_response("DescribeDBSnapshotAttributes", "    <DBSnapshotAttributesResult>\n      <DBSnapshotAttributes/>\n    </DBSnapshotAttributesResult>".to_string(), &rid)),
1663            "ModifyDBSnapshot" | "ModifyDBSnapshotAttribute" => Ok(xml_response(action.as_str(), "    <DBSnapshot/>".to_string(), &rid)),
1664            "RestoreDBClusterFromSnapshot" => {
1665                let target = get_param(req, "DBClusterIdentifier")
1666                    .ok_or_else(|| missing("DBClusterIdentifier"))?;
1667                let snapshot_id = get_param(req, "SnapshotIdentifier")
1668                    .or_else(|| get_param(req, "DBClusterSnapshotIdentifier"))
1669                    .ok_or_else(|| missing("SnapshotIdentifier"))?;
1670                let arn = Arn::new("rds", region, &aid, &format!("cluster:{target}")).to_string();
1671                let mut accounts = write_state!();
1672                let state = accounts.get_or_create(&aid);
1673                let snapshot = state
1674                    .extras
1675                    .get("cluster_snapshots")
1676                    .and_then(|m| m.get(&snapshot_id))
1677                    .cloned()
1678                    .ok_or_else(|| {
1679                        AwsServiceError::aws_error(
1680                            StatusCode::NOT_FOUND,
1681                            "DBClusterSnapshotNotFoundFault",
1682                            format!("DBClusterSnapshot {snapshot_id} not found."),
1683                        )
1684                    })?;
1685                let source_cluster_id = snapshot
1686                    .get("DBClusterIdentifier")
1687                    .and_then(|v| v.as_str())
1688                    .unwrap_or("");
1689                let pending_dump_b64 = snapshot
1690                    .get("DumpDataB64")
1691                    .and_then(|v| v.as_str())
1692                    .map(str::to_string);
1693                let mut entry = state
1694                    .extras
1695                    .get("clusters")
1696                    .and_then(|m| m.get(source_cluster_id))
1697                    .cloned()
1698                    .unwrap_or_else(|| {
1699                        json!({
1700                            "Engine": get_param(req, "Engine").unwrap_or_else(|| "aurora-postgresql".to_string()),
1701                            "EngineVersion": get_param(req, "EngineVersion").unwrap_or_else(|| "15.3".to_string()),
1702                            "MasterUsername": "postgres",
1703                            "Port": 5432,
1704                        })
1705                    });
1706                if let Some(obj) = entry.as_object_mut() {
1707                    obj.insert("DBClusterIdentifier".to_string(), json!(target));
1708                    obj.insert("DBClusterArn".to_string(), json!(arn));
1709                    obj.insert("Status".to_string(), json!("available"));
1710                    obj.insert(
1711                        "Endpoint".to_string(),
1712                        json!(format!("{target}.cluster-xxx.{region}.rds.amazonaws.com")),
1713                    );
1714                    obj.insert(
1715                        "ReaderEndpoint".to_string(),
1716                        json!(format!("{target}.cluster-ro-xxx.{region}.rds.amazonaws.com")),
1717                    );
1718                    // Restored cluster is a distinct resource: mint a fresh
1719                    // immutable resource id rather than reuse the source's.
1720                    obj.insert(
1721                        "DbClusterResourceId".to_string(),
1722                        json!(new_cluster_resource_id()),
1723                    );
1724                    obj.remove("ReplicationSourceIdentifier");
1725                    // The new cluster starts empty; the user is expected
1726                    // to call CreateDBInstance with DBClusterIdentifier
1727                    // pointing here, at which point we replay the dump.
1728                    obj.remove("DBClusterMembers");
1729                    obj.remove("WriterDBInstanceIdentifier");
1730                    // Drop snapshot bookkeeping that leaked in via the
1731                    // source-cluster clone path.
1732                    obj.remove("DBClusterSnapshotIdentifier");
1733                    obj.remove("DBClusterSnapshotArn");
1734                    obj.remove("DumpDataB64");
1735                    if let Some(engine) = get_param(req, "Engine") {
1736                        obj.insert("Engine".to_string(), json!(engine));
1737                    }
1738                    if let Some(version) = get_param(req, "EngineVersion") {
1739                        obj.insert("EngineVersion".to_string(), json!(version));
1740                    }
1741                    if let Some(port) = get_param(req, "Port").and_then(|p| p.parse::<i64>().ok()) {
1742                        obj.insert("Port".to_string(), json!(port));
1743                    }
1744                    // Stage the snapshot dump so the next CreateDBInstance
1745                    // joining this cluster replays the data into its
1746                    // fresh container.
1747                    if let Some(b64) = pending_dump_b64 {
1748                        obj.insert("PendingRestoreDumpB64".to_string(), json!(b64));
1749                    }
1750                }
1751                store(&mut state.extras, "clusters").insert(target.clone(), entry.clone());
1752                drop(accounts);
1753                self.emit_event(
1754                    RdsSourceType::DbCluster,
1755                    &target,
1756                    &arn,
1757                    "RDS-EVENT-0170",
1758                    &["creation"],
1759                    "DB cluster restored from snapshot",
1760                );
1761                Ok(xml_response(
1762                    "RestoreDBClusterFromSnapshot",
1763                    format!(
1764                        "    <DBCluster>\n{}\n    </DBCluster>",
1765                        db_cluster_member_xml(&entry)
1766                    ),
1767                    &rid,
1768                ))
1769            }
1770            // Sync metadata-only fallback for RestoreDBClusterToPointInTime;
1771            // the dispatcher in `handle_request` routes the action to the
1772            // async path that also dumps and stages the source writer.
1773            "RestoreDBClusterToPointInTime" => {
1774                let target = get_param(req, "DBClusterIdentifier")
1775                    .ok_or_else(|| missing("DBClusterIdentifier"))?;
1776                let source = get_param(req, "SourceDBClusterIdentifier")
1777                    .ok_or_else(|| missing("SourceDBClusterIdentifier"))?;
1778                let arn = Arn::new("rds", region, &aid, &format!("cluster:{target}")).to_string();
1779                let mut accounts = write_state!();
1780                let state = accounts.get_or_create(&aid);
1781                let mut entry = state
1782                    .extras
1783                    .get("clusters")
1784                    .and_then(|m| m.get(&source))
1785                    .cloned()
1786                    .ok_or_else(|| {
1787                        AwsServiceError::aws_error(
1788                            StatusCode::NOT_FOUND,
1789                            "DBClusterNotFoundFault",
1790                            format!("DBCluster {source} not found."),
1791                        )
1792                    })?;
1793                if let Some(obj) = entry.as_object_mut() {
1794                    obj.insert("DBClusterIdentifier".to_string(), json!(target));
1795                    obj.insert("DBClusterArn".to_string(), json!(arn));
1796                    obj.insert("Status".to_string(), json!("available"));
1797                    obj.insert(
1798                        "Endpoint".to_string(),
1799                        json!(format!("{target}.cluster-xxx.{region}.rds.amazonaws.com")),
1800                    );
1801                    obj.insert(
1802                        "ReaderEndpoint".to_string(),
1803                        json!(format!("{target}.cluster-ro-xxx.{region}.rds.amazonaws.com")),
1804                    );
1805                    obj.insert(
1806                        "DbClusterResourceId".to_string(),
1807                        json!(new_cluster_resource_id()),
1808                    );
1809                    obj.remove("DBClusterMembers");
1810                    obj.remove("WriterDBInstanceIdentifier");
1811                    if let Some(restore_time) = get_param(req, "RestoreToTime") {
1812                        obj.insert("RestoreToTime".to_string(), json!(restore_time));
1813                    }
1814                    if let Some(latest) = get_param(req, "UseLatestRestorableTime") {
1815                        obj.insert("UseLatestRestorableTime".to_string(), json!(latest));
1816                    }
1817                }
1818                store(&mut state.extras, "clusters").insert(target.clone(), entry.clone());
1819                drop(accounts);
1820                self.emit_event(
1821                    RdsSourceType::DbCluster,
1822                    &target,
1823                    &arn,
1824                    "RDS-EVENT-0171",
1825                    &["creation"],
1826                    "DB cluster restored to point in time",
1827                );
1828                Ok(xml_response(
1829                    "RestoreDBClusterToPointInTime",
1830                    format!(
1831                        "    <DBCluster>\n{}\n    </DBCluster>",
1832                        db_cluster_member_xml(&entry)
1833                    ),
1834                    &rid,
1835                ))
1836            }
1837            "RestoreDBClusterFromS3" => Ok(xml_response(
1838                action.as_str(),
1839                "    <DBCluster/>".to_string(),
1840                &rid,
1841            )),
1842
1843            // ── Recommendations ──
1844            "DescribeDBRecommendations" => Ok(xml_response("DescribeDBRecommendations", "    <DBRecommendations/>".to_string(), &rid)),
1845            "ModifyDBRecommendation" => Ok(xml_response("ModifyDBRecommendation", "    <DBRecommendation/>".to_string(), &rid)),
1846
1847            // ── Certificates ──
1848            "DescribeCertificates" => Ok(xml_response("DescribeCertificates", "    <Certificates/>".to_string(), &rid)),
1849            "ModifyCertificates" => {
1850                let cert_id = get_param(req, "CertificateIdentifier");
1851                let remove_override = get_param(req, "RemoveCustomerOverride")
1852                    .map(|v| v.eq_ignore_ascii_case("true"))
1853                    .unwrap_or(false);
1854                let mut accounts = write_state!();
1855                let state = accounts.get_or_create(&aid);
1856                if remove_override {
1857                    state.default_certificate_identifier = None;
1858                } else if let Some(id) = cert_id.clone() {
1859                    state.default_certificate_identifier = Some(id);
1860                }
1861                let echoed = state
1862                    .default_certificate_identifier
1863                    .clone()
1864                    .unwrap_or_default();
1865                Ok(xml_response("ModifyCertificates", format!("    <Certificate>\n      <CertificateIdentifier>{}</CertificateIdentifier>\n      <CustomerOverride>{}</CustomerOverride>\n    </Certificate>", xml_escape(&echoed), !remove_override && cert_id.is_some()), &rid))
1866            }
1867
1868            // ── Account / events / regions / log files / capacity ──
1869            "DescribeAccountAttributes" => Ok(xml_response("DescribeAccountAttributes", "    <AccountQuotas/>".to_string(), &rid)),
1870            "DescribeEventCategories" => Ok(xml_response("DescribeEventCategories", "    <EventCategoriesMapList/>".to_string(), &rid)),
1871            "DescribeEvents" => self.describe_events(req, &rid),
1872            "DescribeSourceRegions" => Ok(xml_response("DescribeSourceRegions", "    <SourceRegions/>".to_string(), &rid)),
1873            "DescribeDBMajorEngineVersions" => Ok(xml_response("DescribeDBMajorEngineVersions", "    <DBMajorEngineVersions/>".to_string(), &rid)),
1874            "DescribeServerlessV2PlatformVersions" => {
1875                let engine = get_param(req, "Engine").unwrap_or_else(|| "aurora-mysql".to_string());
1876                let version_filter = get_param(req, "ServerlessV2PlatformVersion");
1877                let all = [
1878                    ("4", true, "Version 4 offering scaling up to 256 ACUs", 256.0_f64),
1879                    ("3", false, "Version 3 offering scaling up to 256 ACUs", 256.0),
1880                    ("2", false, "Version 2 offering scaling up to 256 ACUs", 256.0),
1881                    ("1", false, "Version 1 offering scaling up to 128 ACUs", 128.0),
1882                ];
1883                let body = all
1884                    .iter()
1885                    .filter(|(v, ..)| version_filter.as_deref().is_none_or(|f| f == *v))
1886                    .map(|(v, is_default, desc, max)| {
1887                        format!(
1888                            "      <member>\n        <Engine>{e}</Engine>\n        <IsDefault>{d}</IsDefault>\n        <ServerlessV2PlatformVersion>{v}</ServerlessV2PlatformVersion>\n        <ServerlessV2PlatformVersionDescription>{desc}</ServerlessV2PlatformVersionDescription>\n        <Status>enabled</Status>\n        <ServerlessV2FeaturesSupport>\n          <MinCapacity>0.0</MinCapacity>\n          <MaxCapacity>{max:.1}</MaxCapacity>\n        </ServerlessV2FeaturesSupport>\n      </member>",
1889                            e = xml_escape(&engine),
1890                            d = is_default,
1891                            v = v,
1892                            desc = xml_escape(desc),
1893                            max = max,
1894                        )
1895                    })
1896                    .collect::<Vec<_>>()
1897                    .join("\n");
1898                Ok(xml_response(
1899                    "DescribeServerlessV2PlatformVersions",
1900                    format!("    <ServerlessV2PlatformVersions>\n{body}\n    </ServerlessV2PlatformVersions>"),
1901                    &rid,
1902                ))
1903            }
1904            "DescribeValidDBInstanceModifications" => Ok(xml_response("DescribeValidDBInstanceModifications", "    <ValidDBInstanceModificationsMessage>\n      <ValidProcessorFeatures/>\n      <Storage/>\n    </ValidDBInstanceModificationsMessage>".to_string(), &rid)),
1905            "ModifyCurrentDBClusterCapacity" => Ok(xml_response("ModifyCurrentDBClusterCapacity", "    <DBClusterIdentifier>x</DBClusterIdentifier>\n    <CurrentCapacity>4</CurrentCapacity>".to_string(), &rid)),
1906            "DisableHttpEndpoint" => Ok(xml_response("DisableHttpEndpoint", "    <HttpEndpointEnabled>false</HttpEndpointEnabled>".to_string(), &rid)),
1907            "EnableHttpEndpoint" => Ok(xml_response("EnableHttpEndpoint", "    <HttpEndpointEnabled>true</HttpEndpointEnabled>".to_string(), &rid)),
1908
1909            _ => Err(AwsServiceError::action_not_implemented("rds", &action)),
1910        }
1911    }
1912}
1913
1914// ── XML helpers per resource ──
1915
1916/// Generate a `DbClusterResourceId` in AWS's `cluster-XXXX` form. The suffix
1917/// is immutable and survives rename, so IAM auth / CloudWatch dimensions key
1918/// on it rather than the cluster identifier.
1919pub(crate) fn new_cluster_resource_id() -> String {
1920    format!("cluster-{}", uuid::Uuid::new_v4().simple())
1921}
1922
1923pub(crate) fn db_cluster_xml(id: &str, arn: &str) -> String {
1924    format!(
1925        "    <DBCluster>\n      <DBClusterIdentifier>{}</DBClusterIdentifier>\n      <DBClusterArn>{}</DBClusterArn>\n      <Status>available</Status>\n    </DBCluster>",
1926        xml_escape(id), xml_escape(arn)
1927    )
1928}
1929
1930pub(crate) fn cluster_snapshot_xml(id: &str, arn: &str, cluster: &str) -> String {
1931    format!(
1932        "    <DBClusterSnapshot>\n      <DBClusterSnapshotIdentifier>{}</DBClusterSnapshotIdentifier>\n      <DBClusterSnapshotArn>{}</DBClusterSnapshotArn>\n      <DBClusterIdentifier>{}</DBClusterIdentifier>\n      <Status>available</Status>\n    </DBClusterSnapshot>",
1933        xml_escape(id), xml_escape(arn), xml_escape(cluster),
1934    )
1935}
1936
1937/// AWS-spec `SourceType` enum values for the `DescribeEvents` filter.
1938/// Anything else triggers `InvalidParameterValue`.
1939const VALID_DESCRIBE_EVENTS_SOURCE_TYPES: &[&str] = &[
1940    "db-instance",
1941    "db-cluster",
1942    "db-parameter-group",
1943    "db-security-group",
1944    "db-snapshot",
1945    "db-cluster-snapshot",
1946    "db-proxy",
1947    "blue-green-deployment",
1948    "custom-engine-version",
1949];
1950
1951impl RdsService {
1952    /// Real DescribeEvents implementation: read the per-account events
1953    /// ring written to by `emit_event`. Honour SourceType /
1954    /// SourceIdentifier / Duration / StartTime / EndTime / EventCategories
1955    /// filters plus MaxRecords / Marker pagination, and emit them as the
1956    /// DescribeEventsResult shape.
1957    pub(crate) fn describe_events(
1958        &self,
1959        req: &AwsRequest,
1960        rid: &str,
1961    ) -> Result<AwsResponse, AwsServiceError> {
1962        let source_type = get_param(req, "SourceType");
1963        if let Some(ref t) = source_type {
1964            if !VALID_DESCRIBE_EVENTS_SOURCE_TYPES.contains(&t.as_str()) {
1965                return Err(AwsServiceError::aws_error(
1966                    StatusCode::BAD_REQUEST,
1967                    "InvalidParameterValue",
1968                    format!("SourceType '{t}' is not a valid value."),
1969                ));
1970            }
1971        }
1972        let source_identifier = get_param(req, "SourceIdentifier");
1973        let event_categories: Vec<String> = (1..=20)
1974            .filter_map(|i| get_param(req, &format!("EventCategories.member.{i}")))
1975            .collect();
1976        let duration_minutes: i64 = get_param(req, "Duration")
1977            .and_then(|s| s.parse().ok())
1978            .unwrap_or(60);
1979        let now = chrono::Utc::now();
1980        let start_time = get_param(req, "StartTime")
1981            .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
1982            .map(|dt| dt.with_timezone(&chrono::Utc))
1983            .unwrap_or_else(|| now - chrono::Duration::minutes(duration_minutes));
1984        let end_time = get_param(req, "EndTime")
1985            .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
1986            .map(|dt| dt.with_timezone(&chrono::Utc))
1987            .unwrap_or(now);
1988
1989        let state = self.state_handle().read();
1990        let mut events = state
1991            .get(&req.account_id)
1992            .map(|s| s.events.clone())
1993            .unwrap_or_default();
1994        drop(state);
1995
1996        // AWS returns events ordered by `Date` ascending (oldest first).
1997        events.sort_by_key(|e| e.date);
1998
1999        let filtered: Vec<crate::state::RdsEventRecord> = events
2000            .into_iter()
2001            .filter(|e| {
2002                source_type.as_deref().is_none_or(|t| e.source_type == t)
2003                    && source_identifier
2004                        .as_deref()
2005                        .is_none_or(|i| e.source_identifier == i)
2006                    && (event_categories.is_empty()
2007                        || event_categories
2008                            .iter()
2009                            .any(|c| e.event_categories.iter().any(|ec| ec == c)))
2010                    && e.date >= start_time
2011                    && e.date <= end_time
2012            })
2013            .collect();
2014
2015        // MaxRecords (1..=100, default 100) and Marker pagination. We key
2016        // the marker by the event's RFC3339 timestamp + identifier so
2017        // duplicate dates still paginate deterministically.
2018        let max_records: usize = match get_param(req, "MaxRecords") {
2019            Some(raw) => {
2020                let parsed: i32 = raw.parse().map_err(|_| {
2021                    AwsServiceError::aws_error(
2022                        StatusCode::BAD_REQUEST,
2023                        "InvalidParameterValue",
2024                        "MaxRecords must be a valid integer.",
2025                    )
2026                })?;
2027                if !(1..=100).contains(&parsed) {
2028                    return Err(AwsServiceError::aws_error(
2029                        StatusCode::BAD_REQUEST,
2030                        "InvalidParameterValue",
2031                        "MaxRecords must be between 1 and 100.",
2032                    ));
2033                }
2034                parsed as usize
2035            }
2036            None => 100,
2037        };
2038
2039        let start_index = match get_param(req, "Marker") {
2040            Some(marker) => marker.parse::<usize>().map_err(|_| {
2041                AwsServiceError::aws_error(
2042                    StatusCode::BAD_REQUEST,
2043                    "InvalidParameterValue",
2044                    "Marker is invalid.",
2045                )
2046            })?,
2047            None => 0,
2048        };
2049        let end_index = std::cmp::min(start_index.saturating_add(max_records), filtered.len());
2050        let next_marker = if end_index < filtered.len() {
2051            Some(end_index.to_string())
2052        } else {
2053            None
2054        };
2055        let page = filtered.get(start_index..end_index).unwrap_or(&[]);
2056
2057        let mut body = String::new();
2058        if let Some(m) = next_marker {
2059            body.push_str(&format!("    <Marker>{}</Marker>\n", xml_escape(&m)));
2060        }
2061        body.push_str("    <Events>\n");
2062        for e in page {
2063            body.push_str("      <Event>\n");
2064            body.push_str(&format!(
2065                "        <SourceIdentifier>{}</SourceIdentifier>\n",
2066                xml_escape(&e.source_identifier),
2067            ));
2068            body.push_str(&format!(
2069                "        <SourceType>{}</SourceType>\n",
2070                xml_escape(&e.source_type),
2071            ));
2072            body.push_str(&format!(
2073                "        <Message>{}</Message>\n",
2074                xml_escape(&e.message),
2075            ));
2076            body.push_str(&format!(
2077                "        <SourceArn>{}</SourceArn>\n",
2078                xml_escape(&e.source_arn),
2079            ));
2080            body.push_str("        <EventCategories>\n");
2081            for cat in &e.event_categories {
2082                body.push_str(&format!(
2083                    "          <EventCategory>{}</EventCategory>\n",
2084                    xml_escape(cat),
2085                ));
2086            }
2087            body.push_str("        </EventCategories>\n");
2088            body.push_str(&format!("        <Date>{}</Date>\n", e.date.to_rfc3339(),));
2089            body.push_str("      </Event>\n");
2090        }
2091        body.push_str("    </Events>");
2092        Ok(xml_response("DescribeEvents", body, rid))
2093    }
2094}
2095
2096mod cluster_actions;
2097mod parse;
2098#[cfg(test)]
2099mod tests;
2100mod xml_renderers;
2101use cluster_actions::*;
2102use parse::*;
2103use xml_renderers::*;