1use std::collections::{BTreeMap, HashMap};
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use http::StatusCode;
6
7use fakecloud_core::query::{
8 optional_query_param, query_metadata_only_xml, query_response_xml, required_query_param,
9};
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11
12use std::sync::Arc;
13
14use fakecloud_persistence::SnapshotStore;
15use tokio::sync::Mutex;
16
17use crate::state::{
18 AlarmState, CloudWatchSnapshot, Dashboard, MetricAlarm, MetricDatum, SharedCloudWatchState,
19 StatisticSet, CLOUDWATCH_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const NS: &str = "http://monitoring.amazonaws.com/doc/2010-08-01/";
23
24const SUPPORTED_ACTIONS: &[&str] = &[
25 "PutMetricData",
26 "GetMetricStatistics",
27 "GetMetricData",
28 "ListMetrics",
29 "PutMetricAlarm",
30 "DescribeAlarms",
31 "DescribeAlarmsForMetric",
32 "DeleteAlarms",
33 "EnableAlarmActions",
34 "DisableAlarmActions",
35 "SetAlarmState",
36 "DescribeAlarmHistory",
37];
38
39pub struct CloudWatchService {
40 state: SharedCloudWatchState,
41 snapshot_store: Option<Arc<dyn SnapshotStore>>,
42 snapshot_lock: Arc<Mutex<()>>,
43}
44
45impl CloudWatchService {
46 pub fn new(state: SharedCloudWatchState) -> Self {
47 Self {
48 state,
49 snapshot_store: None,
50 snapshot_lock: Arc::new(Mutex::new(())),
51 }
52 }
53
54 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
58 self.snapshot_store = Some(store);
59 self
60 }
61
62 pub(crate) async fn save_snapshot(&self) {
66 let Some(store) = self.snapshot_store.clone() else {
67 return;
68 };
69 let _guard = self.snapshot_lock.lock().await;
70 let snapshot = CloudWatchSnapshot {
71 schema_version: CLOUDWATCH_SNAPSHOT_SCHEMA_VERSION,
72 accounts: self.state.read().clone_for_snapshot(),
73 };
74 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
75 let bytes = serde_json::to_vec(&snapshot)
76 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
77 store.save(&bytes)
78 })
79 .await;
80 match join {
81 Ok(Ok(())) => {}
82 Ok(Err(err)) => tracing::error!(%err, "failed to write cloudwatch snapshot"),
83 Err(err) => tracing::error!(%err, "cloudwatch snapshot task panicked"),
84 }
85 }
86}
87
88#[async_trait]
89impl AwsService for CloudWatchService {
90 fn service_name(&self) -> &str {
91 "monitoring"
92 }
93
94 fn supported_actions(&self) -> &[&str] {
95 SUPPORTED_ACTIONS
96 }
97
98 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
99 let mutates = matches!(
100 req.action.as_str(),
101 "PutMetricData"
102 | "PutMetricAlarm"
103 | "DeleteAlarms"
104 | "EnableAlarmActions"
105 | "DisableAlarmActions"
106 | "SetAlarmState"
107 | "PutDashboard"
108 | "DeleteDashboards"
109 );
110 let result = match req.action.as_str() {
111 "PutMetricData" => self.put_metric_data(&req),
112 "GetMetricStatistics" => self.get_metric_statistics(&req),
113 "GetMetricData" => self.get_metric_data(&req),
114 "ListMetrics" => self.list_metrics(&req),
115 "PutMetricAlarm" => self.put_metric_alarm(&req),
116 "DescribeAlarms" => self.describe_alarms(&req),
117 "DescribeAlarmsForMetric" => self.describe_alarms_for_metric(&req),
118 "DeleteAlarms" => self.delete_alarms(&req),
119 "EnableAlarmActions" => self.enable_alarm_actions(&req),
120 "DisableAlarmActions" => self.disable_alarm_actions(&req),
121 "SetAlarmState" => self.set_alarm_state(&req),
122 "DescribeAlarmHistory" => self.describe_alarm_history(&req),
123 "PutDashboard" => self.put_dashboard(&req),
124 "GetDashboard" => self.get_dashboard(&req),
125 "DeleteDashboards" => self.delete_dashboards(&req),
126 "ListDashboards" => self.list_dashboards(&req),
127 _ => Err(AwsServiceError::action_not_implemented(
128 "monitoring",
129 &req.action,
130 )),
131 };
132 if mutates && result.is_ok() {
133 self.save_snapshot().await;
134 }
135 result
136 }
137}
138
139fn xml_response(action: &str, inner: &str, request_id: &str) -> AwsResponse {
140 AwsResponse::xml(
141 StatusCode::OK,
142 query_response_xml(action, NS, inner, request_id),
143 )
144}
145
146fn empty_metadata_response(action: &str, request_id: &str) -> AwsResponse {
147 AwsResponse::xml(
148 StatusCode::OK,
149 query_metadata_only_xml(action, NS, request_id),
150 )
151}
152
153fn invalid_param(message: impl Into<String>) -> AwsServiceError {
154 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidParameterValue", message)
155}
156
157fn collect_indexed(req: &AwsRequest, prefix: &str) -> Vec<HashMap<String, String>> {
158 let mut by_index: BTreeMap<u32, HashMap<String, String>> = BTreeMap::new();
159 let needle = format!("{prefix}.member.");
160 for (k, v) in req.query_params.iter() {
161 let Some(rest) = k.strip_prefix(&needle) else {
162 continue;
163 };
164 let mut parts = rest.splitn(2, '.');
165 let Some(idx_str) = parts.next() else {
166 continue;
167 };
168 let Ok(idx) = idx_str.parse::<u32>() else {
169 continue;
170 };
171 let field = parts.next().unwrap_or("").to_string();
172 by_index.entry(idx).or_default().insert(field, v.clone());
173 }
174 by_index.into_values().collect()
175}
176
177fn parse_dimensions(member: &HashMap<String, String>, prefix: &str) -> BTreeMap<String, String> {
178 let mut dims: BTreeMap<u32, (Option<String>, Option<String>)> = BTreeMap::new();
179 let needle = format!("{prefix}.member.");
180 for (k, v) in member.iter() {
181 let Some(rest) = k.strip_prefix(&needle) else {
182 continue;
183 };
184 let mut parts = rest.splitn(2, '.');
185 let Some(idx_str) = parts.next() else {
186 continue;
187 };
188 let Ok(idx) = idx_str.parse::<u32>() else {
189 continue;
190 };
191 let field = parts.next().unwrap_or("");
192 let entry = dims.entry(idx).or_default();
193 match field {
194 "Name" => entry.0 = Some(v.clone()),
195 "Value" => entry.1 = Some(v.clone()),
196 _ => {}
197 }
198 }
199 let mut out = BTreeMap::new();
200 for (_, (name, value)) in dims {
201 if let (Some(n), Some(v)) = (name, value) {
202 out.insert(n, v);
203 }
204 }
205 out
206}
207
208fn parse_dimensions_query(req: &AwsRequest, prefix: &str) -> BTreeMap<String, String> {
209 let mut dims: BTreeMap<u32, (Option<String>, Option<String>)> = BTreeMap::new();
210 let needle = format!("{prefix}.member.");
211 for (k, v) in req.query_params.iter() {
212 let Some(rest) = k.strip_prefix(&needle) else {
213 continue;
214 };
215 let mut parts = rest.splitn(2, '.');
216 let Some(idx_str) = parts.next() else {
217 continue;
218 };
219 let Ok(idx) = idx_str.parse::<u32>() else {
220 continue;
221 };
222 let field = parts.next().unwrap_or("");
223 let entry = dims.entry(idx).or_default();
224 match field {
225 "Name" => entry.0 = Some(v.clone()),
226 "Value" => entry.1 = Some(v.clone()),
227 _ => {}
228 }
229 }
230 let mut out = BTreeMap::new();
231 for (_, (name, value)) in dims {
232 if let (Some(n), Some(v)) = (name, value) {
233 out.insert(n, v);
234 }
235 }
236 out
237}
238
239fn xml_escape(s: &str) -> String {
240 s.replace('&', "&")
241 .replace('<', "<")
242 .replace('>', ">")
243 .replace('"', """)
244 .replace('\'', "'")
245}
246
247#[derive(Clone, Copy)]
251struct DatumStats {
252 sum: f64,
253 min: f64,
254 max: f64,
255 count: f64,
256}
257
258fn datum_stats(d: &MetricDatum) -> Option<DatumStats> {
259 if let Some(v) = d.value {
260 return Some(DatumStats {
261 sum: v,
262 min: v,
263 max: v,
264 count: 1.0,
265 });
266 }
267 if let Some(s) = &d.statistic_values {
268 return Some(DatumStats {
269 sum: s.sum,
270 min: s.minimum,
271 max: s.maximum,
272 count: s.sample_count,
273 });
274 }
275 None
276}
277
278fn merge_stats(acc: &mut DatumStats, other: DatumStats) {
279 acc.sum += other.sum;
280 acc.count += other.count;
281 if other.min < acc.min {
282 acc.min = other.min;
283 }
284 if other.max > acc.max {
285 acc.max = other.max;
286 }
287}
288
289fn stat_value(stat: &str, agg: DatumStats) -> Option<f64> {
290 match stat {
291 "Sum" => Some(agg.sum),
292 "Average" => {
293 if agg.count > 0.0 {
294 Some(agg.sum / agg.count)
295 } else {
296 None
297 }
298 }
299 "Minimum" => Some(agg.min),
300 "Maximum" => Some(agg.max),
301 "SampleCount" => Some(agg.count),
302 _ => None,
303 }
304}
305
306fn render_dimensions(dims: &BTreeMap<String, String>) -> String {
307 let mut s = String::from("<Dimensions>");
308 for (name, value) in dims.iter() {
309 s.push_str(&format!(
310 "<member><Name>{}</Name><Value>{}</Value></member>",
311 xml_escape(name),
312 xml_escape(value),
313 ));
314 }
315 s.push_str("</Dimensions>");
316 s
317}
318
319impl CloudWatchService {
320 fn put_metric_data(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
321 let namespace = required_query_param(req, "Namespace")?;
322 let members = collect_indexed(req, "MetricData");
323 if members.is_empty() {
324 return Err(invalid_param(
325 "PutMetricData requires at least one MetricData entry",
326 ));
327 }
328
329 let now = Utc::now();
330 let mut state = self.state.write();
331 let acct = state.get_or_create(&req.account_id);
332 let metrics_map = acct.metrics_in_mut(&req.region);
333 let bucket = metrics_map.entry(namespace.clone()).or_default();
334
335 for member in members {
336 let metric_name = member
337 .get("MetricName")
338 .cloned()
339 .ok_or_else(|| invalid_param("MetricData.member.N.MetricName is required"))?;
340 let value = member
341 .get("Value")
342 .map(|s| s.parse::<f64>())
343 .transpose()
344 .map_err(|_| invalid_param("Value must be a valid number"))?;
345 let timestamp = member
346 .get("Timestamp")
347 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
348 .map(|d| d.with_timezone(&Utc))
349 .unwrap_or(now);
350 let unit = member.get("Unit").cloned();
351 let storage_resolution = member
352 .get("StorageResolution")
353 .and_then(|s| s.parse::<i64>().ok());
354 let dimensions = parse_dimensions(&member, "Dimensions");
355
356 let statistic_values = if let (Some(sc), Some(sum), Some(min), Some(max)) = (
357 member.get("StatisticValues.SampleCount"),
358 member.get("StatisticValues.Sum"),
359 member.get("StatisticValues.Minimum"),
360 member.get("StatisticValues.Maximum"),
361 ) {
362 Some(StatisticSet {
363 sample_count: sc.parse::<f64>().map_err(|_| {
364 invalid_param("StatisticValues.SampleCount must be a number")
365 })?,
366 sum: sum
367 .parse::<f64>()
368 .map_err(|_| invalid_param("StatisticValues.Sum must be a number"))?,
369 minimum: min
370 .parse::<f64>()
371 .map_err(|_| invalid_param("StatisticValues.Minimum must be a number"))?,
372 maximum: max
373 .parse::<f64>()
374 .map_err(|_| invalid_param("StatisticValues.Maximum must be a number"))?,
375 })
376 } else {
377 None
378 };
379
380 if value.is_none() && statistic_values.is_none() {
381 return Err(invalid_param(
382 "MetricData entry must supply either Value or StatisticValues",
383 ));
384 }
385
386 bucket.push(MetricDatum {
387 metric_name,
388 dimensions,
389 timestamp,
390 value,
391 statistic_values,
392 unit,
393 storage_resolution,
394 });
395 }
396
397 Ok(empty_metadata_response("PutMetricData", &req.request_id))
398 }
399
400 fn list_metrics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
401 let namespace = optional_query_param(req, "Namespace");
402 let metric_name = optional_query_param(req, "MetricName");
403 let dim_filter = parse_dimensions_query(req, "Dimensions");
404
405 let state = self.state.read();
406 let mut out = String::from("<Metrics>");
407 if let Some(acct) = state.get(&req.account_id) {
408 if let Some(map) = acct.metrics_in(&req.region) {
409 for (ns, data) in map.iter() {
410 if let Some(filter_ns) = namespace.as_ref() {
411 if ns != filter_ns {
412 continue;
413 }
414 }
415 let mut seen: BTreeMap<(String, BTreeMap<String, String>), ()> =
416 BTreeMap::new();
417 for d in data.iter() {
418 if let Some(filter_name) = metric_name.as_ref() {
419 if &d.metric_name != filter_name {
420 continue;
421 }
422 }
423 if !dim_filter.is_empty()
424 && !dim_filter
425 .iter()
426 .all(|(k, v)| d.dimensions.get(k) == Some(v))
427 {
428 continue;
429 }
430 seen.insert((d.metric_name.clone(), d.dimensions.clone()), ());
431 }
432 for ((name, dims), _) in seen {
433 out.push_str("<member>");
434 out.push_str(&format!("<Namespace>{}</Namespace>", xml_escape(ns)));
435 out.push_str(&format!("<MetricName>{}</MetricName>", xml_escape(&name)));
436 out.push_str(&render_dimensions(&dims));
437 out.push_str("</member>");
438 }
439 }
440 }
441 }
442 out.push_str("</Metrics>");
443
444 Ok(xml_response("ListMetrics", &out, &req.request_id))
445 }
446
447 fn get_metric_statistics(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448 let namespace = required_query_param(req, "Namespace")?;
449 let metric_name = required_query_param(req, "MetricName")?;
450 let start = required_query_param(req, "StartTime")?;
451 let end = required_query_param(req, "EndTime")?;
452 let period = required_query_param(req, "Period")?
453 .parse::<i64>()
454 .map_err(|_| invalid_param("Period must be an integer"))?;
455 if period <= 0 {
456 return Err(invalid_param("Period must be positive"));
457 }
458 let start_ts = DateTime::parse_from_rfc3339(&start)
459 .map_err(|_| invalid_param("StartTime must be ISO 8601"))?
460 .with_timezone(&Utc);
461 let end_ts = DateTime::parse_from_rfc3339(&end)
462 .map_err(|_| invalid_param("EndTime must be ISO 8601"))?
463 .with_timezone(&Utc);
464
465 let mut statistics: Vec<String> = Vec::new();
466 for (k, v) in req.query_params.iter() {
467 if k.starts_with("Statistics.member.") {
468 statistics.push(v.clone());
469 }
470 }
471 if statistics.is_empty() {
472 return Err(invalid_param("At least one Statistic is required"));
473 }
474
475 let dim_filter = parse_dimensions_query(req, "Dimensions");
476
477 let state = self.state.read();
478 let mut datapoints: Vec<(DateTime<Utc>, BTreeMap<String, f64>)> = Vec::new();
479 if let Some(acct) = state.get(&req.account_id) {
480 if let Some(map) = acct.metrics_in(&req.region) {
481 if let Some(data) = map.get(&namespace) {
482 let mut buckets: BTreeMap<DateTime<Utc>, DatumStats> = BTreeMap::new();
483 for d in data.iter() {
484 if d.metric_name != metric_name {
485 continue;
486 }
487 if !dim_filter
488 .iter()
489 .all(|(k, v)| d.dimensions.get(k) == Some(v))
490 {
491 continue;
492 }
493 if d.timestamp < start_ts || d.timestamp >= end_ts {
494 continue;
495 }
496 let Some(stats) = datum_stats(d) else {
497 continue;
498 };
499 let secs = d.timestamp.timestamp();
500 let bucket_secs = secs - secs.rem_euclid(period);
501 let bucket_ts =
502 DateTime::<Utc>::from_timestamp(bucket_secs, 0).unwrap_or(d.timestamp);
503 buckets
504 .entry(bucket_ts)
505 .and_modify(|acc| merge_stats(acc, stats))
506 .or_insert(stats);
507 }
508 for (ts, agg) in buckets {
509 let mut stats = BTreeMap::new();
510 for stat in statistics.iter() {
511 if let Some(v) = stat_value(stat, agg) {
512 stats.insert(stat.clone(), v);
513 }
514 }
515 datapoints.push((ts, stats));
516 }
517 }
518 }
519 }
520
521 let mut inner = format!("<Label>{}</Label>", xml_escape(&metric_name));
522 inner.push_str("<Datapoints>");
523 for (ts, stats) in datapoints {
524 inner.push_str("<member>");
525 inner.push_str(&format!(
526 "<Timestamp>{}</Timestamp>",
527 ts.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
528 ));
529 for (name, value) in stats {
530 inner.push_str(&format!("<{name}>{value}</{name}>"));
531 }
532 inner.push_str("</member>");
533 }
534 inner.push_str("</Datapoints>");
535
536 Ok(xml_response("GetMetricStatistics", &inner, &req.request_id))
537 }
538
539 fn get_metric_data(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
540 let start = required_query_param(req, "StartTime")?;
541 let end = required_query_param(req, "EndTime")?;
542 let start_ts = DateTime::parse_from_rfc3339(&start)
543 .map_err(|_| invalid_param("StartTime must be ISO 8601"))?
544 .with_timezone(&Utc);
545 let end_ts = DateTime::parse_from_rfc3339(&end)
546 .map_err(|_| invalid_param("EndTime must be ISO 8601"))?
547 .with_timezone(&Utc);
548
549 let queries = collect_indexed(req, "MetricDataQueries");
550 if queries.is_empty() {
551 return Err(invalid_param(
552 "MetricDataQueries must contain at least one entry",
553 ));
554 }
555
556 let state = self.state.read();
557 let mut inner = String::from("<MetricDataResults>");
558 for q in queries {
559 let id = q.get("Id").cloned().unwrap_or_default();
560 let label = q.get("Label").cloned().unwrap_or_else(|| id.clone());
561 let stat = q
562 .get("MetricStat.Stat")
563 .cloned()
564 .unwrap_or_else(|| "Sum".to_string());
565 let metric_name = q.get("MetricStat.Metric.MetricName").cloned();
566 let namespace = q.get("MetricStat.Metric.Namespace").cloned();
567 let period: i64 = q
568 .get("MetricStat.Period")
569 .and_then(|s| s.parse::<i64>().ok())
570 .unwrap_or(60);
571 if period <= 0 {
572 return Err(invalid_param(
573 "MetricStat.Period must be a positive integer",
574 ));
575 }
576 let dim_filter = parse_dimensions(&q, "MetricStat.Metric.Dimensions");
577
578 let (mut timestamps, mut values): (Vec<String>, Vec<f64>) = (Vec::new(), Vec::new());
579 if let (Some(metric_name), Some(namespace)) = (metric_name, namespace) {
580 if let Some(acct) = state.get(&req.account_id) {
581 if let Some(map) = acct.metrics_in(&req.region) {
582 if let Some(data) = map.get(&namespace) {
583 let mut buckets: BTreeMap<DateTime<Utc>, DatumStats> = BTreeMap::new();
584 for d in data.iter() {
585 if d.metric_name != metric_name {
586 continue;
587 }
588 if !dim_filter
589 .iter()
590 .all(|(k, v)| d.dimensions.get(k) == Some(v))
591 {
592 continue;
593 }
594 if d.timestamp < start_ts || d.timestamp >= end_ts {
595 continue;
596 }
597 let Some(stats) = datum_stats(d) else {
598 continue;
599 };
600 let secs = d.timestamp.timestamp();
601 let bucket_secs = secs - secs.rem_euclid(period);
602 let bucket_ts = DateTime::<Utc>::from_timestamp(bucket_secs, 0)
603 .unwrap_or(d.timestamp);
604 buckets
605 .entry(bucket_ts)
606 .and_modify(|acc| merge_stats(acc, stats))
607 .or_insert(stats);
608 }
609 for (ts, agg) in buckets {
610 let Some(v) = stat_value(&stat, agg) else {
611 continue;
612 };
613 timestamps
614 .push(ts.to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
615 values.push(v);
616 }
617 }
618 }
619 }
620 }
621
622 inner.push_str("<member>");
623 inner.push_str(&format!("<Id>{}</Id>", xml_escape(&id)));
624 inner.push_str(&format!("<Label>{}</Label>", xml_escape(&label)));
625 inner.push_str("<StatusCode>Complete</StatusCode>");
626 inner.push_str("<Timestamps>");
627 for ts in timestamps {
628 inner.push_str(&format!("<member>{ts}</member>"));
629 }
630 inner.push_str("</Timestamps>");
631 inner.push_str("<Values>");
632 for v in values {
633 inner.push_str(&format!("<member>{v}</member>"));
634 }
635 inner.push_str("</Values>");
636 inner.push_str("</member>");
637 }
638 inner.push_str("</MetricDataResults>");
639 inner.push_str("<Messages></Messages>");
640
641 Ok(xml_response("GetMetricData", &inner, &req.request_id))
642 }
643
644 fn put_metric_alarm(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645 let alarm_name = required_query_param(req, "AlarmName")?;
646 let comparison = required_query_param(req, "ComparisonOperator")?;
647 let evaluation_periods = required_query_param(req, "EvaluationPeriods")?
648 .parse::<i64>()
649 .map_err(|_| invalid_param("EvaluationPeriods must be an integer"))?;
650
651 let alarm_description = optional_query_param(req, "AlarmDescription");
652 let actions_enabled = optional_query_param(req, "ActionsEnabled")
653 .map(|s| s.eq_ignore_ascii_case("true"))
654 .unwrap_or(true);
655
656 let metric_name = optional_query_param(req, "MetricName");
657 let namespace = optional_query_param(req, "Namespace");
658 let statistic = optional_query_param(req, "Statistic");
659 let extended_statistic = optional_query_param(req, "ExtendedStatistic");
660 let period = optional_query_param(req, "Period").and_then(|s| s.parse::<i64>().ok());
661 let unit = optional_query_param(req, "Unit");
662 let datapoints_to_alarm =
663 optional_query_param(req, "DatapointsToAlarm").and_then(|s| s.parse::<i64>().ok());
664 let threshold = optional_query_param(req, "Threshold").and_then(|s| s.parse::<f64>().ok());
665 let treat_missing_data = optional_query_param(req, "TreatMissingData");
666 let evaluate_low_sample_count_percentile =
667 optional_query_param(req, "EvaluateLowSampleCountPercentile");
668 let dimensions = parse_dimensions_query(req, "Dimensions");
669
670 let mut ok_actions = Vec::new();
671 let mut alarm_actions = Vec::new();
672 let mut insufficient_data_actions = Vec::new();
673 for (k, v) in req.query_params.iter() {
674 if k.starts_with("OKActions.member.") {
675 ok_actions.push(v.clone());
676 } else if k.starts_with("AlarmActions.member.") {
677 alarm_actions.push(v.clone());
678 } else if k.starts_with("InsufficientDataActions.member.") {
679 insufficient_data_actions.push(v.clone());
680 }
681 }
682
683 let arn = format!(
684 "arn:aws:cloudwatch:{}:{}:alarm:{}",
685 req.region, req.account_id, alarm_name
686 );
687 let now = Utc::now();
688
689 let mut state = self.state.write();
690 let acct = state.get_or_create(&req.account_id);
691 let alarms = acct.alarms_in_mut(&req.region);
692 let existing = alarms.get(&alarm_name).cloned();
693 let alarm = MetricAlarm {
694 alarm_name: alarm_name.clone(),
695 alarm_arn: arn,
696 alarm_description,
697 actions_enabled,
698 ok_actions,
699 alarm_actions,
700 insufficient_data_actions,
701 state_value: existing
702 .as_ref()
703 .map(|a| a.state_value)
704 .unwrap_or(AlarmState::InsufficientData),
705 state_reason: existing
706 .as_ref()
707 .map(|a| a.state_reason.clone())
708 .unwrap_or_else(|| "Unchecked: Initial alarm creation".to_string()),
709 state_updated_timestamp: existing
710 .as_ref()
711 .map(|a| a.state_updated_timestamp)
712 .unwrap_or(now),
713 metric_name,
714 namespace,
715 statistic,
716 extended_statistic,
717 dimensions,
718 period,
719 unit,
720 evaluation_periods,
721 datapoints_to_alarm,
722 threshold,
723 comparison_operator: comparison,
724 treat_missing_data,
725 evaluate_low_sample_count_percentile,
726 configuration_updated_timestamp: existing
727 .as_ref()
728 .map(|a| a.configuration_updated_timestamp)
729 .unwrap_or(now),
730 alarm_configuration_updated_timestamp: now,
731 };
732 alarms.insert(alarm_name, alarm);
733
734 Ok(empty_metadata_response("PutMetricAlarm", &req.request_id))
735 }
736
737 fn describe_alarms(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
738 let mut filter_names: Vec<String> = Vec::new();
739 for (k, v) in req.query_params.iter() {
740 if k.starts_with("AlarmNames.member.") {
741 filter_names.push(v.clone());
742 }
743 }
744 let prefix = optional_query_param(req, "AlarmNamePrefix");
745 let state_filter = optional_query_param(req, "StateValue");
746 let action_prefix = optional_query_param(req, "ActionPrefix");
747
748 let state = self.state.read();
749 let mut inner = String::from("<MetricAlarms>");
750 if let Some(acct) = state.get(&req.account_id) {
751 if let Some(alarms) = acct.alarms_in(&req.region) {
752 for alarm in alarms.values() {
753 if !filter_names.is_empty() && !filter_names.contains(&alarm.alarm_name) {
754 continue;
755 }
756 if let Some(p) = prefix.as_ref() {
757 if !alarm.alarm_name.starts_with(p) {
758 continue;
759 }
760 }
761 if let Some(sv) = state_filter.as_ref() {
762 if alarm.state_value.as_str() != sv {
763 continue;
764 }
765 }
766 if let Some(ap) = action_prefix.as_ref() {
767 let any = alarm
768 .alarm_actions
769 .iter()
770 .chain(alarm.ok_actions.iter())
771 .chain(alarm.insufficient_data_actions.iter())
772 .any(|a| a.starts_with(ap));
773 if !any {
774 continue;
775 }
776 }
777 inner.push_str(&render_alarm(alarm));
778 }
779 }
780 }
781 inner.push_str("</MetricAlarms>");
782 inner.push_str("<CompositeAlarms></CompositeAlarms>");
783
784 Ok(xml_response("DescribeAlarms", &inner, &req.request_id))
785 }
786
787 fn describe_alarms_for_metric(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
788 let metric_name = required_query_param(req, "MetricName")?;
789 let namespace = required_query_param(req, "Namespace")?;
790 let dim_filter = parse_dimensions_query(req, "Dimensions");
791
792 let state = self.state.read();
793 let mut inner = String::from("<MetricAlarms>");
794 if let Some(acct) = state.get(&req.account_id) {
795 if let Some(alarms) = acct.alarms_in(&req.region) {
796 for alarm in alarms.values() {
797 if alarm.metric_name.as_deref() != Some(&metric_name) {
798 continue;
799 }
800 if alarm.namespace.as_deref() != Some(&namespace) {
801 continue;
802 }
803 if !dim_filter.is_empty() && alarm.dimensions != dim_filter {
804 continue;
805 }
806 inner.push_str(&render_alarm(alarm));
807 }
808 }
809 }
810 inner.push_str("</MetricAlarms>");
811
812 Ok(xml_response(
813 "DescribeAlarmsForMetric",
814 &inner,
815 &req.request_id,
816 ))
817 }
818
819 fn delete_alarms(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
820 let mut names: Vec<String> = Vec::new();
821 for (k, v) in req.query_params.iter() {
822 if k.starts_with("AlarmNames.member.") {
823 names.push(v.clone());
824 }
825 }
826 if names.is_empty() {
827 return Err(invalid_param("AlarmNames must contain at least one name"));
828 }
829
830 let mut state = self.state.write();
831 let acct = state.get_or_create(&req.account_id);
832 let alarms = acct.alarms_in_mut(&req.region);
833 for name in names {
834 alarms.remove(&name);
835 }
836
837 Ok(empty_metadata_response("DeleteAlarms", &req.request_id))
838 }
839
840 fn enable_alarm_actions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
841 self.toggle_alarm_actions(req, true, "EnableAlarmActions")
842 }
843
844 fn disable_alarm_actions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
845 self.toggle_alarm_actions(req, false, "DisableAlarmActions")
846 }
847
848 fn toggle_alarm_actions(
849 &self,
850 req: &AwsRequest,
851 enabled: bool,
852 action_name: &str,
853 ) -> Result<AwsResponse, AwsServiceError> {
854 let mut names: Vec<String> = Vec::new();
855 for (k, v) in req.query_params.iter() {
856 if k.starts_with("AlarmNames.member.") {
857 names.push(v.clone());
858 }
859 }
860 let mut state = self.state.write();
861 let acct = state.get_or_create(&req.account_id);
862 let alarms = acct.alarms_in_mut(&req.region);
863 for name in names {
864 if let Some(alarm) = alarms.get_mut(&name) {
865 alarm.actions_enabled = enabled;
866 alarm.alarm_configuration_updated_timestamp = Utc::now();
867 }
868 }
869 Ok(empty_metadata_response(action_name, &req.request_id))
870 }
871
872 fn set_alarm_state(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
873 let alarm_name = required_query_param(req, "AlarmName")?;
874 let state_value = required_query_param(req, "StateValue")?;
875 let state_reason = required_query_param(req, "StateReason")?;
876 let new_state = AlarmState::parse(&state_value)
877 .ok_or_else(|| invalid_param("StateValue must be OK | ALARM | INSUFFICIENT_DATA"))?;
878
879 let mut state = self.state.write();
880 let acct = state.get_or_create(&req.account_id);
881 let alarms = acct.alarms_in_mut(&req.region);
882 let alarm = alarms.get_mut(&alarm_name).ok_or_else(|| {
883 AwsServiceError::aws_error(
884 StatusCode::NOT_FOUND,
885 "ResourceNotFound",
886 format!("Alarm {alarm_name} not found"),
887 )
888 })?;
889 alarm.state_value = new_state;
890 alarm.state_reason = state_reason;
891 alarm.state_updated_timestamp = Utc::now();
892
893 Ok(empty_metadata_response("SetAlarmState", &req.request_id))
894 }
895
896 fn describe_alarm_history(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
897 let inner = String::from("<AlarmHistoryItems></AlarmHistoryItems>");
900 Ok(xml_response(
901 "DescribeAlarmHistory",
902 &inner,
903 &req.request_id,
904 ))
905 }
906
907 fn put_dashboard(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
908 let dashboard_name = req
909 .query_params
910 .get("DashboardName")
911 .ok_or_else(|| invalid_param("DashboardName is required"))?
912 .clone();
913 let body = req
914 .query_params
915 .get("DashboardBody")
916 .ok_or_else(|| invalid_param("DashboardBody is required"))?
917 .clone();
918 if serde_json::from_str::<serde_json::Value>(&body).is_err() {
921 return Err(AwsServiceError::aws_error(
922 StatusCode::BAD_REQUEST,
923 "InvalidParameterInput",
924 "DashboardBody must be a valid JSON object",
925 ));
926 }
927 let arn = format!(
928 "arn:aws:cloudwatch::{}:dashboard/{dashboard_name}",
929 req.account_id
930 );
931 let dashboard = Dashboard {
932 name: dashboard_name.clone(),
933 arn,
934 size_bytes: body.len() as i64,
935 body,
936 last_modified: Utc::now(),
937 };
938 let mut state = self.state.write();
939 let acct = state.get_or_create(&req.account_id);
940 acct.dashboards.insert(dashboard_name, dashboard);
941 let inner = String::from("<DashboardValidationMessages/>");
944 Ok(xml_response("PutDashboard", &inner, &req.request_id))
945 }
946
947 fn get_dashboard(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
948 let name = req
949 .query_params
950 .get("DashboardName")
951 .ok_or_else(|| invalid_param("DashboardName is required"))?
952 .clone();
953 let state = self.state.read();
954 let dashboard = state
955 .get(&req.account_id)
956 .and_then(|a| a.dashboards.get(&name))
957 .cloned()
958 .ok_or_else(|| {
959 AwsServiceError::aws_error(
960 StatusCode::NOT_FOUND,
961 "ResourceNotFound",
962 format!("Dashboard {name} does not exist"),
963 )
964 })?;
965 let inner = format!(
966 "<DashboardArn>{}</DashboardArn><DashboardBody>{}</DashboardBody><DashboardName>{}</DashboardName>",
967 xml_escape(&dashboard.arn),
968 xml_escape(&dashboard.body),
969 xml_escape(&dashboard.name),
970 );
971 Ok(xml_response("GetDashboard", &inner, &req.request_id))
972 }
973
974 fn delete_dashboards(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
975 let mut names: Vec<String> = Vec::new();
976 for (k, v) in req.query_params.iter() {
977 if k.starts_with("DashboardNames.member.") {
978 names.push(v.clone());
979 }
980 }
981 if names.is_empty() {
982 return Err(invalid_param(
983 "DashboardNames must contain at least one name",
984 ));
985 }
986 let mut state = self.state.write();
987 let acct = state.get_or_create(&req.account_id);
988 for n in names {
989 acct.dashboards.remove(&n);
990 }
991 Ok(empty_metadata_response("DeleteDashboards", &req.request_id))
992 }
993
994 fn list_dashboards(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
995 let prefix = req.query_params.get("DashboardNamePrefix").cloned();
996 let state = self.state.read();
997 let dashboards: Vec<Dashboard> = state
998 .get(&req.account_id)
999 .map(|a| {
1000 a.dashboards
1001 .values()
1002 .filter(|d| prefix.as_ref().is_none_or(|p| d.name.starts_with(p)))
1003 .cloned()
1004 .collect()
1005 })
1006 .unwrap_or_default();
1007 let mut entries = String::new();
1008 for d in &dashboards {
1009 entries.push_str("<member>");
1010 entries.push_str(&format!(
1011 "<DashboardArn>{}</DashboardArn><DashboardName>{}</DashboardName><LastModified>{}</LastModified><Size>{}</Size>",
1012 xml_escape(&d.arn),
1013 xml_escape(&d.name),
1014 d.last_modified.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
1015 d.size_bytes,
1016 ));
1017 entries.push_str("</member>");
1018 }
1019 let inner = format!("<DashboardEntries>{entries}</DashboardEntries>");
1020 Ok(xml_response("ListDashboards", &inner, &req.request_id))
1021 }
1022}
1023
1024fn render_alarm(alarm: &MetricAlarm) -> String {
1025 let mut s = String::from("<member>");
1026 s.push_str(&format!(
1027 "<AlarmName>{}</AlarmName>",
1028 xml_escape(&alarm.alarm_name)
1029 ));
1030 s.push_str(&format!(
1031 "<AlarmArn>{}</AlarmArn>",
1032 xml_escape(&alarm.alarm_arn)
1033 ));
1034 if let Some(d) = &alarm.alarm_description {
1035 s.push_str(&format!(
1036 "<AlarmDescription>{}</AlarmDescription>",
1037 xml_escape(d)
1038 ));
1039 }
1040 s.push_str(&format!(
1041 "<ActionsEnabled>{}</ActionsEnabled>",
1042 alarm.actions_enabled
1043 ));
1044 push_action_list(&mut s, "OKActions", &alarm.ok_actions);
1045 push_action_list(&mut s, "AlarmActions", &alarm.alarm_actions);
1046 push_action_list(
1047 &mut s,
1048 "InsufficientDataActions",
1049 &alarm.insufficient_data_actions,
1050 );
1051 s.push_str(&format!(
1052 "<StateValue>{}</StateValue>",
1053 alarm.state_value.as_str()
1054 ));
1055 s.push_str(&format!(
1056 "<StateReason>{}</StateReason>",
1057 xml_escape(&alarm.state_reason)
1058 ));
1059 s.push_str(&format!(
1060 "<StateUpdatedTimestamp>{}</StateUpdatedTimestamp>",
1061 alarm
1062 .state_updated_timestamp
1063 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1064 ));
1065 if let Some(m) = &alarm.metric_name {
1066 s.push_str(&format!("<MetricName>{}</MetricName>", xml_escape(m)));
1067 }
1068 if let Some(n) = &alarm.namespace {
1069 s.push_str(&format!("<Namespace>{}</Namespace>", xml_escape(n)));
1070 }
1071 if let Some(stat) = &alarm.statistic {
1072 s.push_str(&format!("<Statistic>{}</Statistic>", xml_escape(stat)));
1073 }
1074 if let Some(ext) = &alarm.extended_statistic {
1075 s.push_str(&format!(
1076 "<ExtendedStatistic>{}</ExtendedStatistic>",
1077 xml_escape(ext)
1078 ));
1079 }
1080 s.push_str(&render_dimensions(&alarm.dimensions));
1081 if let Some(p) = alarm.period {
1082 s.push_str(&format!("<Period>{p}</Period>"));
1083 }
1084 if let Some(u) = &alarm.unit {
1085 s.push_str(&format!("<Unit>{}</Unit>", xml_escape(u)));
1086 }
1087 s.push_str(&format!(
1088 "<EvaluationPeriods>{}</EvaluationPeriods>",
1089 alarm.evaluation_periods
1090 ));
1091 if let Some(d) = alarm.datapoints_to_alarm {
1092 s.push_str(&format!("<DatapointsToAlarm>{d}</DatapointsToAlarm>"));
1093 }
1094 if let Some(t) = alarm.threshold {
1095 s.push_str(&format!("<Threshold>{t}</Threshold>"));
1096 }
1097 s.push_str(&format!(
1098 "<ComparisonOperator>{}</ComparisonOperator>",
1099 xml_escape(&alarm.comparison_operator)
1100 ));
1101 if let Some(t) = &alarm.treat_missing_data {
1102 s.push_str(&format!(
1103 "<TreatMissingData>{}</TreatMissingData>",
1104 xml_escape(t)
1105 ));
1106 }
1107 if let Some(e) = &alarm.evaluate_low_sample_count_percentile {
1108 s.push_str(&format!(
1109 "<EvaluateLowSampleCountPercentile>{}</EvaluateLowSampleCountPercentile>",
1110 xml_escape(e)
1111 ));
1112 }
1113 s.push_str(&format!(
1114 "<AlarmConfigurationUpdatedTimestamp>{}</AlarmConfigurationUpdatedTimestamp>",
1115 alarm
1116 .alarm_configuration_updated_timestamp
1117 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
1118 ));
1119 s.push_str("</member>");
1120 s
1121}
1122
1123fn push_action_list(s: &mut String, name: &str, actions: &[String]) {
1124 s.push_str(&format!("<{name}>"));
1125 for action in actions {
1126 s.push_str(&format!("<member>{}</member>", xml_escape(action)));
1127 }
1128 s.push_str(&format!("</{name}>"));
1129}