Skip to main content

fakecloud_cloudwatch/
service.rs

1use std::collections::{BTreeMap, HashMap};
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use http::StatusCode;
6
7use fakecloud_core::query::{
8    optional_query_param, query_metadata_only_xml, query_response_xml, required_query_param,
9};
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use std::sync::Arc;
13
14use fakecloud_persistence::SnapshotStore;
15use tokio::sync::Mutex;
16
17use crate::state::{
18    AlarmState, CloudWatchSnapshot, Dashboard, MetricAlarm, MetricDatum, SharedCloudWatchState,
19    StatisticSet, CLOUDWATCH_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const NS: &str = "http://monitoring.amazonaws.com/doc/2010-08-01/";
23
24const SUPPORTED_ACTIONS: &[&str] = &[
25    "PutMetricData",
26    "GetMetricStatistics",
27    "GetMetricData",
28    "ListMetrics",
29    "PutMetricAlarm",
30    "DescribeAlarms",
31    "DescribeAlarmsForMetric",
32    "DeleteAlarms",
33    "EnableAlarmActions",
34    "DisableAlarmActions",
35    "SetAlarmState",
36    "DescribeAlarmHistory",
37];
38
39pub struct CloudWatchService {
40    state: SharedCloudWatchState,
41    snapshot_store: Option<Arc<dyn SnapshotStore>>,
42    snapshot_lock: Arc<Mutex<()>>,
43}
44
45impl CloudWatchService {
46    pub fn new(state: SharedCloudWatchState) -> Self {
47        Self {
48            state,
49            snapshot_store: None,
50            snapshot_lock: Arc::new(Mutex::new(())),
51        }
52    }
53
54    /// Attach a `SnapshotStore` so alarms / dashboards / metrics survive
55    /// restarts. Without this, all CloudWatch state is in-memory only —
56    /// alarms wired to actions fire on a freshly-started process.
57    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
58        self.snapshot_store = Some(store);
59        self
60    }
61
62    /// Persist current state as a snapshot. Cloned + serialized under
63    /// the snapshot lock so concurrent mutators can't race a stale-last
64    /// write.
65    pub(crate) async fn save_snapshot(&self) {
66        let Some(store) = self.snapshot_store.clone() else {
67            return;
68        };
69        let _guard = self.snapshot_lock.lock().await;
70        let snapshot = CloudWatchSnapshot {
71            schema_version: CLOUDWATCH_SNAPSHOT_SCHEMA_VERSION,
72            accounts: self.state.read().clone_for_snapshot(),
73        };
74        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
75            let bytes = serde_json::to_vec(&snapshot)
76                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
77            store.save(&bytes)
78        })
79        .await;
80        match join {
81            Ok(Ok(())) => {}
82            Ok(Err(err)) => tracing::error!(%err, "failed to write cloudwatch snapshot"),
83            Err(err) => tracing::error!(%err, "cloudwatch snapshot task panicked"),
84        }
85    }
86}
87
88#[async_trait]
89impl AwsService for CloudWatchService {
90    fn service_name(&self) -> &str {
91        "monitoring"
92    }
93
94    fn supported_actions(&self) -> &[&str] {
95        SUPPORTED_ACTIONS
96    }
97
98    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
99        let mutates = matches!(
100            req.action.as_str(),
101            "PutMetricData"
102                | "PutMetricAlarm"
103                | "DeleteAlarms"
104                | "EnableAlarmActions"
105                | "DisableAlarmActions"
106                | "SetAlarmState"
107                | "PutDashboard"
108                | "DeleteDashboards"
109        );
110        let result = match req.action.as_str() {
111            "PutMetricData" => self.put_metric_data(&req),
112            "GetMetricStatistics" => self.get_metric_statistics(&req),
113            "GetMetricData" => self.get_metric_data(&req),
114            "ListMetrics" => self.list_metrics(&req),
115            "PutMetricAlarm" => self.put_metric_alarm(&req),
116            "DescribeAlarms" => self.describe_alarms(&req),
117            "DescribeAlarmsForMetric" => self.describe_alarms_for_metric(&req),
118            "DeleteAlarms" => self.delete_alarms(&req),
119            "EnableAlarmActions" => self.enable_alarm_actions(&req),
120            "DisableAlarmActions" => self.disable_alarm_actions(&req),
121            "SetAlarmState" => self.set_alarm_state(&req),
122            "DescribeAlarmHistory" => self.describe_alarm_history(&req),
123            "PutDashboard" => self.put_dashboard(&req),
124            "GetDashboard" => self.get_dashboard(&req),
125            "DeleteDashboards" => self.delete_dashboards(&req),
126            "ListDashboards" => self.list_dashboards(&req),
127            _ => Err(AwsServiceError::action_not_implemented(
128                "monitoring",
129                &req.action,
130            )),
131        };
132        if mutates && result.is_ok() {
133            self.save_snapshot().await;
134        }
135        result
136    }
137}
138
139fn xml_response(action: &str, inner: &str, request_id: &str) -> AwsResponse {
140    AwsResponse::xml(
141        StatusCode::OK,
142        query_response_xml(action, NS, inner, request_id),
143    )
144}
145
146fn empty_metadata_response(action: &str, request_id: &str) -> AwsResponse {
147    AwsResponse::xml(
148        StatusCode::OK,
149        query_metadata_only_xml(action, NS, request_id),
150    )
151}
152
153fn invalid_param(message: impl Into<String>) -> AwsServiceError {
154    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidParameterValue", message)
155}
156
157fn collect_indexed(req: &AwsRequest, prefix: &str) -> Vec<HashMap<String, String>> {
158    let mut by_index: BTreeMap<u32, HashMap<String, String>> = BTreeMap::new();
159    let needle = format!("{prefix}.member.");
160    for (k, v) in req.query_params.iter() {
161        let Some(rest) = k.strip_prefix(&needle) else {
162            continue;
163        };
164        let mut parts = rest.splitn(2, '.');
165        let Some(idx_str) = parts.next() else {
166            continue;
167        };
168        let Ok(idx) = idx_str.parse::<u32>() else {
169            continue;
170        };
171        let field = parts.next().unwrap_or("").to_string();
172        by_index.entry(idx).or_default().insert(field, v.clone());
173    }
174    by_index.into_values().collect()
175}
176
177fn parse_dimensions(member: &HashMap<String, String>, prefix: &str) -> BTreeMap<String, String> {
178    let mut dims: BTreeMap<u32, (Option<String>, Option<String>)> = BTreeMap::new();
179    let needle = format!("{prefix}.member.");
180    for (k, v) in member.iter() {
181        let Some(rest) = k.strip_prefix(&needle) else {
182            continue;
183        };
184        let mut parts = rest.splitn(2, '.');
185        let Some(idx_str) = parts.next() else {
186            continue;
187        };
188        let Ok(idx) = idx_str.parse::<u32>() else {
189            continue;
190        };
191        let field = parts.next().unwrap_or("");
192        let entry = dims.entry(idx).or_default();
193        match field {
194            "Name" => entry.0 = Some(v.clone()),
195            "Value" => entry.1 = Some(v.clone()),
196            _ => {}
197        }
198    }
199    let mut out = BTreeMap::new();
200    for (_, (name, value)) in dims {
201        if let (Some(n), Some(v)) = (name, value) {
202            out.insert(n, v);
203        }
204    }
205    out
206}
207
208fn parse_dimensions_query(req: &AwsRequest, prefix: &str) -> BTreeMap<String, String> {
209    let mut dims: BTreeMap<u32, (Option<String>, Option<String>)> = BTreeMap::new();
210    let needle = format!("{prefix}.member.");
211    for (k, v) in req.query_params.iter() {
212        let Some(rest) = k.strip_prefix(&needle) else {
213            continue;
214        };
215        let mut parts = rest.splitn(2, '.');
216        let Some(idx_str) = parts.next() else {
217            continue;
218        };
219        let Ok(idx) = idx_str.parse::<u32>() else {
220            continue;
221        };
222        let field = parts.next().unwrap_or("");
223        let entry = dims.entry(idx).or_default();
224        match field {
225            "Name" => entry.0 = Some(v.clone()),
226            "Value" => entry.1 = Some(v.clone()),
227            _ => {}
228        }
229    }
230    let mut out = BTreeMap::new();
231    for (_, (name, value)) in dims {
232        if let (Some(n), Some(v)) = (name, value) {
233            out.insert(n, v);
234        }
235    }
236    out
237}
238
239fn xml_escape(s: &str) -> String {
240    s.replace('&', "&amp;")
241        .replace('<', "&lt;")
242        .replace('>', "&gt;")
243        .replace('"', "&quot;")
244        .replace('\'', "&apos;")
245}
246
247/// Per-datapoint aggregation summary covering both the simple `Value` form
248/// and the `StatisticValues` form so callers don't lose the count or
249/// min/max baked into a `StatisticSet`.
250#[derive(Clone, Copy)]
251struct DatumStats {
252    sum: f64,
253    min: f64,
254    max: f64,
255    count: f64,
256}
257
258fn datum_stats(d: &MetricDatum) -> Option<DatumStats> {
259    if let Some(v) = d.value {
260        return Some(DatumStats {
261            sum: v,
262            min: v,
263            max: v,
264            count: 1.0,
265        });
266    }
267    if let Some(s) = &d.statistic_values {
268        return Some(DatumStats {
269            sum: s.sum,
270            min: s.minimum,
271            max: s.maximum,
272            count: s.sample_count,
273        });
274    }
275    None
276}
277
278fn merge_stats(acc: &mut DatumStats, other: DatumStats) {
279    acc.sum += other.sum;
280    acc.count += other.count;
281    if other.min < acc.min {
282        acc.min = other.min;
283    }
284    if other.max > acc.max {
285        acc.max = other.max;
286    }
287}
288
289fn stat_value(stat: &str, agg: DatumStats) -> Option<f64> {
290    match stat {
291        "Sum" => Some(agg.sum),
292        "Average" => {
293            if agg.count > 0.0 {
294                Some(agg.sum / agg.count)
295            } else {
296                None
297            }
298        }
299        "Minimum" => Some(agg.min),
300        "Maximum" => Some(agg.max),
301        "SampleCount" => Some(agg.count),
302        _ => None,
303    }
304}
305
306fn render_dimensions(dims: &BTreeMap<String, String>) -> String {
307    let mut s = String::from("<Dimensions>");
308    for (name, value) in dims.iter() {
309        s.push_str(&format!(
310            "<member><Name>{}</Name><Value>{}</Value></member>",
311            xml_escape(name),
312            xml_escape(value),
313        ));
314    }
315    s.push_str("</Dimensions>");
316    s
317}
318
319impl CloudWatchService {
320    fn put_metric_data(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
321        let namespace = required_query_param(req, "Namespace")?;
322        let members = collect_indexed(req, "MetricData");
323        if members.is_empty() {
324            return Err(invalid_param(
325                "PutMetricData requires at least one MetricData entry",
326            ));
327        }
328
329        let now = Utc::now();
330        let mut state = self.state.write();
331        let acct = state.get_or_create(&req.account_id);
332        let metrics_map = acct.metrics_in_mut(&req.region);
333        let bucket = metrics_map.entry(namespace.clone()).or_default();
334
335        for member in members {
336            let metric_name = member
337                .get("MetricName")
338                .cloned()
339                .ok_or_else(|| invalid_param("MetricData.member.N.MetricName is required"))?;
340            let value = member
341                .get("Value")
342                .map(|s| s.parse::<f64>())
343                .transpose()
344                .map_err(|_| invalid_param("Value must be a valid number"))?;
345            let timestamp = member
346                .get("Timestamp")
347                .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
348                .map(|d| d.with_timezone(&Utc))
349                .unwrap_or(now);
350            let unit = member.get("Unit").cloned();
351            let storage_resolution = member
352                .get("StorageResolution")
353                .and_then(|s| s.parse::<i64>().ok());
354            let dimensions = parse_dimensions(&member, "Dimensions");
355
356            let statistic_values = if let (Some(sc), Some(sum), Some(min), Some(max)) = (
357                member.get("StatisticValues.SampleCount"),
358                member.get("StatisticValues.Sum"),
359                member.get("StatisticValues.Minimum"),
360                member.get("StatisticValues.Maximum"),
361            ) {
362                Some(StatisticSet {
363                    sample_count: sc.parse::<f64>().map_err(|_| {
364                        invalid_param("StatisticValues.SampleCount must be a number")
365                    })?,
366                    sum: sum
367                        .parse::<f64>()
368                        .map_err(|_| invalid_param("StatisticValues.Sum must be a number"))?,
369                    minimum: min
370                        .parse::<f64>()
371                        .map_err(|_| invalid_param("StatisticValues.Minimum must be a number"))?,
372                    maximum: max
373                        .parse::<f64>()
374                        .map_err(|_| invalid_param("StatisticValues.Maximum must be a number"))?,
375                })
376            } else {
377                None
378            };
379
380            if value.is_none() && statistic_values.is_none() {
381                return Err(invalid_param(
382                    "MetricData entry must supply either Value or StatisticValues",
383                ));
384            }
385
386            bucket.push(MetricDatum {
387                metric_name,
388                dimensions,
389                timestamp,
390                value,
391                statistic_values,
392                unit,
393                storage_resolution,
394            });
395        }
396
397        Ok(empty_metadata_response("PutMetricData", &req.request_id))
398    }
399
400    fn list_metrics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
401        let namespace = optional_query_param(req, "Namespace");
402        let metric_name = optional_query_param(req, "MetricName");
403        let dim_filter = parse_dimensions_query(req, "Dimensions");
404
405        let state = self.state.read();
406        let mut out = String::from("<Metrics>");
407        if let Some(acct) = state.get(&req.account_id) {
408            if let Some(map) = acct.metrics_in(&req.region) {
409                for (ns, data) in map.iter() {
410                    if let Some(filter_ns) = namespace.as_ref() {
411                        if ns != filter_ns {
412                            continue;
413                        }
414                    }
415                    let mut seen: BTreeMap<(String, BTreeMap<String, String>), ()> =
416                        BTreeMap::new();
417                    for d in data.iter() {
418                        if let Some(filter_name) = metric_name.as_ref() {
419                            if &d.metric_name != filter_name {
420                                continue;
421                            }
422                        }
423                        if !dim_filter.is_empty()
424                            && !dim_filter
425                                .iter()
426                                .all(|(k, v)| d.dimensions.get(k) == Some(v))
427                        {
428                            continue;
429                        }
430                        seen.insert((d.metric_name.clone(), d.dimensions.clone()), ());
431                    }
432                    for ((name, dims), _) in seen {
433                        out.push_str("<member>");
434                        out.push_str(&format!("<Namespace>{}</Namespace>", xml_escape(ns)));
435                        out.push_str(&format!("<MetricName>{}</MetricName>", xml_escape(&name)));
436                        out.push_str(&render_dimensions(&dims));
437                        out.push_str("</member>");
438                    }
439                }
440            }
441        }
442        out.push_str("</Metrics>");
443
444        Ok(xml_response("ListMetrics", &out, &req.request_id))
445    }
446
447    fn get_metric_statistics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448        let namespace = required_query_param(req, "Namespace")?;
449        let metric_name = required_query_param(req, "MetricName")?;
450        let start = required_query_param(req, "StartTime")?;
451        let end = required_query_param(req, "EndTime")?;
452        let period = required_query_param(req, "Period")?
453            .parse::<i64>()
454            .map_err(|_| invalid_param("Period must be an integer"))?;
455        if period <= 0 {
456            return Err(invalid_param("Period must be positive"));
457        }
458        let start_ts = DateTime::parse_from_rfc3339(&start)
459            .map_err(|_| invalid_param("StartTime must be ISO 8601"))?
460            .with_timezone(&Utc);
461        let end_ts = DateTime::parse_from_rfc3339(&end)
462            .map_err(|_| invalid_param("EndTime must be ISO 8601"))?
463            .with_timezone(&Utc);
464
465        let mut statistics: Vec<String> = Vec::new();
466        for (k, v) in req.query_params.iter() {
467            if k.starts_with("Statistics.member.") {
468                statistics.push(v.clone());
469            }
470        }
471        if statistics.is_empty() {
472            return Err(invalid_param("At least one Statistic is required"));
473        }
474
475        let dim_filter = parse_dimensions_query(req, "Dimensions");
476
477        let state = self.state.read();
478        let mut datapoints: Vec<(DateTime<Utc>, BTreeMap<String, f64>)> = Vec::new();
479        if let Some(acct) = state.get(&req.account_id) {
480            if let Some(map) = acct.metrics_in(&req.region) {
481                if let Some(data) = map.get(&namespace) {
482                    let mut buckets: BTreeMap<DateTime<Utc>, DatumStats> = BTreeMap::new();
483                    for d in data.iter() {
484                        if d.metric_name != metric_name {
485                            continue;
486                        }
487                        if !dim_filter
488                            .iter()
489                            .all(|(k, v)| d.dimensions.get(k) == Some(v))
490                        {
491                            continue;
492                        }
493                        if d.timestamp < start_ts || d.timestamp >= end_ts {
494                            continue;
495                        }
496                        let Some(stats) = datum_stats(d) else {
497                            continue;
498                        };
499                        let secs = d.timestamp.timestamp();
500                        let bucket_secs = secs - secs.rem_euclid(period);
501                        let bucket_ts =
502                            DateTime::<Utc>::from_timestamp(bucket_secs, 0).unwrap_or(d.timestamp);
503                        buckets
504                            .entry(bucket_ts)
505                            .and_modify(|acc| merge_stats(acc, stats))
506                            .or_insert(stats);
507                    }
508                    for (ts, agg) in buckets {
509                        let mut stats = BTreeMap::new();
510                        for stat in statistics.iter() {
511                            if let Some(v) = stat_value(stat, agg) {
512                                stats.insert(stat.clone(), v);
513                            }
514                        }
515                        datapoints.push((ts, stats));
516                    }
517                }
518            }
519        }
520
521        let mut inner = format!("<Label>{}</Label>", xml_escape(&metric_name));
522        inner.push_str("<Datapoints>");
523        for (ts, stats) in datapoints {
524            inner.push_str("<member>");
525            inner.push_str(&format!(
526                "<Timestamp>{}</Timestamp>",
527                ts.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
528            ));
529            for (name, value) in stats {
530                inner.push_str(&format!("<{name}>{value}</{name}>"));
531            }
532            inner.push_str("</member>");
533        }
534        inner.push_str("</Datapoints>");
535
536        Ok(xml_response("GetMetricStatistics", &inner, &req.request_id))
537    }
538
539    fn get_metric_data(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
540        let start = required_query_param(req, "StartTime")?;
541        let end = required_query_param(req, "EndTime")?;
542        let start_ts = DateTime::parse_from_rfc3339(&start)
543            .map_err(|_| invalid_param("StartTime must be ISO 8601"))?
544            .with_timezone(&Utc);
545        let end_ts = DateTime::parse_from_rfc3339(&end)
546            .map_err(|_| invalid_param("EndTime must be ISO 8601"))?
547            .with_timezone(&Utc);
548
549        let queries = collect_indexed(req, "MetricDataQueries");
550        if queries.is_empty() {
551            return Err(invalid_param(
552                "MetricDataQueries must contain at least one entry",
553            ));
554        }
555
556        let state = self.state.read();
557        let mut inner = String::from("<MetricDataResults>");
558        for q in queries {
559            let id = q.get("Id").cloned().unwrap_or_default();
560            let label = q.get("Label").cloned().unwrap_or_else(|| id.clone());
561            let stat = q
562                .get("MetricStat.Stat")
563                .cloned()
564                .unwrap_or_else(|| "Sum".to_string());
565            let metric_name = q.get("MetricStat.Metric.MetricName").cloned();
566            let namespace = q.get("MetricStat.Metric.Namespace").cloned();
567            let period: i64 = q
568                .get("MetricStat.Period")
569                .and_then(|s| s.parse::<i64>().ok())
570                .unwrap_or(60);
571            if period <= 0 {
572                return Err(invalid_param(
573                    "MetricStat.Period must be a positive integer",
574                ));
575            }
576            let dim_filter = parse_dimensions(&q, "MetricStat.Metric.Dimensions");
577
578            let (mut timestamps, mut values): (Vec<String>, Vec<f64>) = (Vec::new(), Vec::new());
579            if let (Some(metric_name), Some(namespace)) = (metric_name, namespace) {
580                if let Some(acct) = state.get(&req.account_id) {
581                    if let Some(map) = acct.metrics_in(&req.region) {
582                        if let Some(data) = map.get(&namespace) {
583                            let mut buckets: BTreeMap<DateTime<Utc>, DatumStats> = BTreeMap::new();
584                            for d in data.iter() {
585                                if d.metric_name != metric_name {
586                                    continue;
587                                }
588                                if !dim_filter
589                                    .iter()
590                                    .all(|(k, v)| d.dimensions.get(k) == Some(v))
591                                {
592                                    continue;
593                                }
594                                if d.timestamp < start_ts || d.timestamp >= end_ts {
595                                    continue;
596                                }
597                                let Some(stats) = datum_stats(d) else {
598                                    continue;
599                                };
600                                let secs = d.timestamp.timestamp();
601                                let bucket_secs = secs - secs.rem_euclid(period);
602                                let bucket_ts = DateTime::<Utc>::from_timestamp(bucket_secs, 0)
603                                    .unwrap_or(d.timestamp);
604                                buckets
605                                    .entry(bucket_ts)
606                                    .and_modify(|acc| merge_stats(acc, stats))
607                                    .or_insert(stats);
608                            }
609                            for (ts, agg) in buckets {
610                                let Some(v) = stat_value(&stat, agg) else {
611                                    continue;
612                                };
613                                timestamps
614                                    .push(ts.to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
615                                values.push(v);
616                            }
617                        }
618                    }
619                }
620            }
621
622            inner.push_str("<member>");
623            inner.push_str(&format!("<Id>{}</Id>", xml_escape(&id)));
624            inner.push_str(&format!("<Label>{}</Label>", xml_escape(&label)));
625            inner.push_str("<StatusCode>Complete</StatusCode>");
626            inner.push_str("<Timestamps>");
627            for ts in timestamps {
628                inner.push_str(&format!("<member>{ts}</member>"));
629            }
630            inner.push_str("</Timestamps>");
631            inner.push_str("<Values>");
632            for v in values {
633                inner.push_str(&format!("<member>{v}</member>"));
634            }
635            inner.push_str("</Values>");
636            inner.push_str("</member>");
637        }
638        inner.push_str("</MetricDataResults>");
639        inner.push_str("<Messages></Messages>");
640
641        Ok(xml_response("GetMetricData", &inner, &req.request_id))
642    }
643
644    fn put_metric_alarm(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645        let alarm_name = required_query_param(req, "AlarmName")?;
646        let comparison = required_query_param(req, "ComparisonOperator")?;
647        let evaluation_periods = required_query_param(req, "EvaluationPeriods")?
648            .parse::<i64>()
649            .map_err(|_| invalid_param("EvaluationPeriods must be an integer"))?;
650
651        let alarm_description = optional_query_param(req, "AlarmDescription");
652        let actions_enabled = optional_query_param(req, "ActionsEnabled")
653            .map(|s| s.eq_ignore_ascii_case("true"))
654            .unwrap_or(true);
655
656        let metric_name = optional_query_param(req, "MetricName");
657        let namespace = optional_query_param(req, "Namespace");
658        let statistic = optional_query_param(req, "Statistic");
659        let extended_statistic = optional_query_param(req, "ExtendedStatistic");
660        let period = optional_query_param(req, "Period").and_then(|s| s.parse::<i64>().ok());
661        let unit = optional_query_param(req, "Unit");
662        let datapoints_to_alarm =
663            optional_query_param(req, "DatapointsToAlarm").and_then(|s| s.parse::<i64>().ok());
664        let threshold = optional_query_param(req, "Threshold").and_then(|s| s.parse::<f64>().ok());
665        let treat_missing_data = optional_query_param(req, "TreatMissingData");
666        let evaluate_low_sample_count_percentile =
667            optional_query_param(req, "EvaluateLowSampleCountPercentile");
668        let dimensions = parse_dimensions_query(req, "Dimensions");
669
670        let mut ok_actions = Vec::new();
671        let mut alarm_actions = Vec::new();
672        let mut insufficient_data_actions = Vec::new();
673        for (k, v) in req.query_params.iter() {
674            if k.starts_with("OKActions.member.") {
675                ok_actions.push(v.clone());
676            } else if k.starts_with("AlarmActions.member.") {
677                alarm_actions.push(v.clone());
678            } else if k.starts_with("InsufficientDataActions.member.") {
679                insufficient_data_actions.push(v.clone());
680            }
681        }
682
683        let arn = format!(
684            "arn:aws:cloudwatch:{}:{}:alarm:{}",
685            req.region, req.account_id, alarm_name
686        );
687        let now = Utc::now();
688
689        let mut state = self.state.write();
690        let acct = state.get_or_create(&req.account_id);
691        let alarms = acct.alarms_in_mut(&req.region);
692        let existing = alarms.get(&alarm_name).cloned();
693        let alarm = MetricAlarm {
694            alarm_name: alarm_name.clone(),
695            alarm_arn: arn,
696            alarm_description,
697            actions_enabled,
698            ok_actions,
699            alarm_actions,
700            insufficient_data_actions,
701            state_value: existing
702                .as_ref()
703                .map(|a| a.state_value)
704                .unwrap_or(AlarmState::InsufficientData),
705            state_reason: existing
706                .as_ref()
707                .map(|a| a.state_reason.clone())
708                .unwrap_or_else(|| "Unchecked: Initial alarm creation".to_string()),
709            state_updated_timestamp: existing
710                .as_ref()
711                .map(|a| a.state_updated_timestamp)
712                .unwrap_or(now),
713            metric_name,
714            namespace,
715            statistic,
716            extended_statistic,
717            dimensions,
718            period,
719            unit,
720            evaluation_periods,
721            datapoints_to_alarm,
722            threshold,
723            comparison_operator: comparison,
724            treat_missing_data,
725            evaluate_low_sample_count_percentile,
726            configuration_updated_timestamp: existing
727                .as_ref()
728                .map(|a| a.configuration_updated_timestamp)
729                .unwrap_or(now),
730            alarm_configuration_updated_timestamp: now,
731        };
732        alarms.insert(alarm_name, alarm);
733
734        Ok(empty_metadata_response("PutMetricAlarm", &req.request_id))
735    }
736
737    fn describe_alarms(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
738        let mut filter_names: Vec<String> = Vec::new();
739        for (k, v) in req.query_params.iter() {
740            if k.starts_with("AlarmNames.member.") {
741                filter_names.push(v.clone());
742            }
743        }
744        let prefix = optional_query_param(req, "AlarmNamePrefix");
745        let state_filter = optional_query_param(req, "StateValue");
746        let action_prefix = optional_query_param(req, "ActionPrefix");
747
748        let state = self.state.read();
749        let mut inner = String::from("<MetricAlarms>");
750        if let Some(acct) = state.get(&req.account_id) {
751            if let Some(alarms) = acct.alarms_in(&req.region) {
752                for alarm in alarms.values() {
753                    if !filter_names.is_empty() && !filter_names.contains(&alarm.alarm_name) {
754                        continue;
755                    }
756                    if let Some(p) = prefix.as_ref() {
757                        if !alarm.alarm_name.starts_with(p) {
758                            continue;
759                        }
760                    }
761                    if let Some(sv) = state_filter.as_ref() {
762                        if alarm.state_value.as_str() != sv {
763                            continue;
764                        }
765                    }
766                    if let Some(ap) = action_prefix.as_ref() {
767                        let any = alarm
768                            .alarm_actions
769                            .iter()
770                            .chain(alarm.ok_actions.iter())
771                            .chain(alarm.insufficient_data_actions.iter())
772                            .any(|a| a.starts_with(ap));
773                        if !any {
774                            continue;
775                        }
776                    }
777                    inner.push_str(&render_alarm(alarm));
778                }
779            }
780        }
781        inner.push_str("</MetricAlarms>");
782        inner.push_str("<CompositeAlarms></CompositeAlarms>");
783
784        Ok(xml_response("DescribeAlarms", &inner, &req.request_id))
785    }
786
787    fn describe_alarms_for_metric(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
788        let metric_name = required_query_param(req, "MetricName")?;
789        let namespace = required_query_param(req, "Namespace")?;
790        let dim_filter = parse_dimensions_query(req, "Dimensions");
791
792        let state = self.state.read();
793        let mut inner = String::from("<MetricAlarms>");
794        if let Some(acct) = state.get(&req.account_id) {
795            if let Some(alarms) = acct.alarms_in(&req.region) {
796                for alarm in alarms.values() {
797                    if alarm.metric_name.as_deref() != Some(&metric_name) {
798                        continue;
799                    }
800                    if alarm.namespace.as_deref() != Some(&namespace) {
801                        continue;
802                    }
803                    if !dim_filter.is_empty() && alarm.dimensions != dim_filter {
804                        continue;
805                    }
806                    inner.push_str(&render_alarm(alarm));
807                }
808            }
809        }
810        inner.push_str("</MetricAlarms>");
811
812        Ok(xml_response(
813            "DescribeAlarmsForMetric",
814            &inner,
815            &req.request_id,
816        ))
817    }
818
819    fn delete_alarms(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
820        let mut names: Vec<String> = Vec::new();
821        for (k, v) in req.query_params.iter() {
822            if k.starts_with("AlarmNames.member.") {
823                names.push(v.clone());
824            }
825        }
826        if names.is_empty() {
827            return Err(invalid_param("AlarmNames must contain at least one name"));
828        }
829
830        let mut state = self.state.write();
831        let acct = state.get_or_create(&req.account_id);
832        let alarms = acct.alarms_in_mut(&req.region);
833        for name in names {
834            alarms.remove(&name);
835        }
836
837        Ok(empty_metadata_response("DeleteAlarms", &req.request_id))
838    }
839
840    fn enable_alarm_actions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
841        self.toggle_alarm_actions(req, true, "EnableAlarmActions")
842    }
843
844    fn disable_alarm_actions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
845        self.toggle_alarm_actions(req, false, "DisableAlarmActions")
846    }
847
848    fn toggle_alarm_actions(
849        &self,
850        req: &AwsRequest,
851        enabled: bool,
852        action_name: &str,
853    ) -> Result<AwsResponse, AwsServiceError> {
854        let mut names: Vec<String> = Vec::new();
855        for (k, v) in req.query_params.iter() {
856            if k.starts_with("AlarmNames.member.") {
857                names.push(v.clone());
858            }
859        }
860        let mut state = self.state.write();
861        let acct = state.get_or_create(&req.account_id);
862        let alarms = acct.alarms_in_mut(&req.region);
863        for name in names {
864            if let Some(alarm) = alarms.get_mut(&name) {
865                alarm.actions_enabled = enabled;
866                alarm.alarm_configuration_updated_timestamp = Utc::now();
867            }
868        }
869        Ok(empty_metadata_response(action_name, &req.request_id))
870    }
871
872    fn set_alarm_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
873        let alarm_name = required_query_param(req, "AlarmName")?;
874        let state_value = required_query_param(req, "StateValue")?;
875        let state_reason = required_query_param(req, "StateReason")?;
876        let new_state = AlarmState::parse(&state_value)
877            .ok_or_else(|| invalid_param("StateValue must be OK | ALARM | INSUFFICIENT_DATA"))?;
878
879        let mut state = self.state.write();
880        let acct = state.get_or_create(&req.account_id);
881        let alarms = acct.alarms_in_mut(&req.region);
882        let alarm = alarms.get_mut(&alarm_name).ok_or_else(|| {
883            AwsServiceError::aws_error(
884                StatusCode::NOT_FOUND,
885                "ResourceNotFound",
886                format!("Alarm {alarm_name} not found"),
887            )
888        })?;
889        alarm.state_value = new_state;
890        alarm.state_reason = state_reason;
891        alarm.state_updated_timestamp = Utc::now();
892
893        Ok(empty_metadata_response("SetAlarmState", &req.request_id))
894    }
895
896    fn describe_alarm_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
897        // Minimal implementation: return empty history. AWS pagination tokens are
898        // not tracked locally, so callers see an empty list rather than a stub.
899        let inner = String::from("<AlarmHistoryItems></AlarmHistoryItems>");
900        Ok(xml_response(
901            "DescribeAlarmHistory",
902            &inner,
903            &req.request_id,
904        ))
905    }
906
907    fn put_dashboard(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
908        let dashboard_name = req
909            .query_params
910            .get("DashboardName")
911            .ok_or_else(|| invalid_param("DashboardName is required"))?
912            .clone();
913        let body = req
914            .query_params
915            .get("DashboardBody")
916            .ok_or_else(|| invalid_param("DashboardBody is required"))?
917            .clone();
918        // AWS validates that DashboardBody parses as JSON; we do the same so
919        // bad bodies surface a useful error before persisting.
920        if serde_json::from_str::<serde_json::Value>(&body).is_err() {
921            return Err(AwsServiceError::aws_error(
922                StatusCode::BAD_REQUEST,
923                "InvalidParameterInput",
924                "DashboardBody must be a valid JSON object",
925            ));
926        }
927        let arn = format!(
928            "arn:aws:cloudwatch::{}:dashboard/{dashboard_name}",
929            req.account_id
930        );
931        let dashboard = Dashboard {
932            name: dashboard_name.clone(),
933            arn,
934            size_bytes: body.len() as i64,
935            body,
936            last_modified: Utc::now(),
937        };
938        let mut state = self.state.write();
939        let acct = state.get_or_create(&req.account_id);
940        acct.dashboards.insert(dashboard_name, dashboard);
941        // PutDashboard returns DashboardValidationMessages — empty when the
942        // body parses cleanly.
943        let inner = String::from("<DashboardValidationMessages/>");
944        Ok(xml_response("PutDashboard", &inner, &req.request_id))
945    }
946
947    fn get_dashboard(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948        let name = req
949            .query_params
950            .get("DashboardName")
951            .ok_or_else(|| invalid_param("DashboardName is required"))?
952            .clone();
953        let state = self.state.read();
954        let dashboard = state
955            .get(&req.account_id)
956            .and_then(|a| a.dashboards.get(&name))
957            .cloned()
958            .ok_or_else(|| {
959                AwsServiceError::aws_error(
960                    StatusCode::NOT_FOUND,
961                    "ResourceNotFound",
962                    format!("Dashboard {name} does not exist"),
963                )
964            })?;
965        let inner = format!(
966            "<DashboardArn>{}</DashboardArn><DashboardBody>{}</DashboardBody><DashboardName>{}</DashboardName>",
967            xml_escape(&dashboard.arn),
968            xml_escape(&dashboard.body),
969            xml_escape(&dashboard.name),
970        );
971        Ok(xml_response("GetDashboard", &inner, &req.request_id))
972    }
973
974    fn delete_dashboards(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
975        let mut names: Vec<String> = Vec::new();
976        for (k, v) in req.query_params.iter() {
977            if k.starts_with("DashboardNames.member.") {
978                names.push(v.clone());
979            }
980        }
981        if names.is_empty() {
982            return Err(invalid_param(
983                "DashboardNames must contain at least one name",
984            ));
985        }
986        let mut state = self.state.write();
987        let acct = state.get_or_create(&req.account_id);
988        for n in names {
989            acct.dashboards.remove(&n);
990        }
991        Ok(empty_metadata_response("DeleteDashboards", &req.request_id))
992    }
993
994    fn list_dashboards(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
995        let prefix = req.query_params.get("DashboardNamePrefix").cloned();
996        let state = self.state.read();
997        let dashboards: Vec<Dashboard> = state
998            .get(&req.account_id)
999            .map(|a| {
1000                a.dashboards
1001                    .values()
1002                    .filter(|d| prefix.as_ref().is_none_or(|p| d.name.starts_with(p)))
1003                    .cloned()
1004                    .collect()
1005            })
1006            .unwrap_or_default();
1007        let mut entries = String::new();
1008        for d in &dashboards {
1009            entries.push_str("<member>");
1010            entries.push_str(&format!(
1011                "<DashboardArn>{}</DashboardArn><DashboardName>{}</DashboardName><LastModified>{}</LastModified><Size>{}</Size>",
1012                xml_escape(&d.arn),
1013                xml_escape(&d.name),
1014                d.last_modified.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
1015                d.size_bytes,
1016            ));
1017            entries.push_str("</member>");
1018        }
1019        let inner = format!("<DashboardEntries>{entries}</DashboardEntries>");
1020        Ok(xml_response("ListDashboards", &inner, &req.request_id))
1021    }
1022}
1023
1024fn render_alarm(alarm: &MetricAlarm) -> String {
1025    let mut s = String::from("<member>");
1026    s.push_str(&format!(
1027        "<AlarmName>{}</AlarmName>",
1028        xml_escape(&alarm.alarm_name)
1029    ));
1030    s.push_str(&format!(
1031        "<AlarmArn>{}</AlarmArn>",
1032        xml_escape(&alarm.alarm_arn)
1033    ));
1034    if let Some(d) = &alarm.alarm_description {
1035        s.push_str(&format!(
1036            "<AlarmDescription>{}</AlarmDescription>",
1037            xml_escape(d)
1038        ));
1039    }
1040    s.push_str(&format!(
1041        "<ActionsEnabled>{}</ActionsEnabled>",
1042        alarm.actions_enabled
1043    ));
1044    push_action_list(&mut s, "OKActions", &alarm.ok_actions);
1045    push_action_list(&mut s, "AlarmActions", &alarm.alarm_actions);
1046    push_action_list(
1047        &mut s,
1048        "InsufficientDataActions",
1049        &alarm.insufficient_data_actions,
1050    );
1051    s.push_str(&format!(
1052        "<StateValue>{}</StateValue>",
1053        alarm.state_value.as_str()
1054    ));
1055    s.push_str(&format!(
1056        "<StateReason>{}</StateReason>",
1057        xml_escape(&alarm.state_reason)
1058    ));
1059    s.push_str(&format!(
1060        "<StateUpdatedTimestamp>{}</StateUpdatedTimestamp>",
1061        alarm
1062            .state_updated_timestamp
1063            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1064    ));
1065    if let Some(m) = &alarm.metric_name {
1066        s.push_str(&format!("<MetricName>{}</MetricName>", xml_escape(m)));
1067    }
1068    if let Some(n) = &alarm.namespace {
1069        s.push_str(&format!("<Namespace>{}</Namespace>", xml_escape(n)));
1070    }
1071    if let Some(stat) = &alarm.statistic {
1072        s.push_str(&format!("<Statistic>{}</Statistic>", xml_escape(stat)));
1073    }
1074    if let Some(ext) = &alarm.extended_statistic {
1075        s.push_str(&format!(
1076            "<ExtendedStatistic>{}</ExtendedStatistic>",
1077            xml_escape(ext)
1078        ));
1079    }
1080    s.push_str(&render_dimensions(&alarm.dimensions));
1081    if let Some(p) = alarm.period {
1082        s.push_str(&format!("<Period>{p}</Period>"));
1083    }
1084    if let Some(u) = &alarm.unit {
1085        s.push_str(&format!("<Unit>{}</Unit>", xml_escape(u)));
1086    }
1087    s.push_str(&format!(
1088        "<EvaluationPeriods>{}</EvaluationPeriods>",
1089        alarm.evaluation_periods
1090    ));
1091    if let Some(d) = alarm.datapoints_to_alarm {
1092        s.push_str(&format!("<DatapointsToAlarm>{d}</DatapointsToAlarm>"));
1093    }
1094    if let Some(t) = alarm.threshold {
1095        s.push_str(&format!("<Threshold>{t}</Threshold>"));
1096    }
1097    s.push_str(&format!(
1098        "<ComparisonOperator>{}</ComparisonOperator>",
1099        xml_escape(&alarm.comparison_operator)
1100    ));
1101    if let Some(t) = &alarm.treat_missing_data {
1102        s.push_str(&format!(
1103            "<TreatMissingData>{}</TreatMissingData>",
1104            xml_escape(t)
1105        ));
1106    }
1107    if let Some(e) = &alarm.evaluate_low_sample_count_percentile {
1108        s.push_str(&format!(
1109            "<EvaluateLowSampleCountPercentile>{}</EvaluateLowSampleCountPercentile>",
1110            xml_escape(e)
1111        ));
1112    }
1113    s.push_str(&format!(
1114        "<AlarmConfigurationUpdatedTimestamp>{}</AlarmConfigurationUpdatedTimestamp>",
1115        alarm
1116            .alarm_configuration_updated_timestamp
1117            .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1118    ));
1119    s.push_str("</member>");
1120    s
1121}
1122
1123fn push_action_list(s: &mut String, name: &str, actions: &[String]) {
1124    s.push_str(&format!("<{name}>"));
1125    for action in actions {
1126        s.push_str(&format!("<member>{}</member>", xml_escape(action)));
1127    }
1128    s.push_str(&format!("</{name}>"));
1129}