Skip to main content

rusticity_core/
sqs.rs

1use crate::config::AwsConfig;
2use anyhow::Result;
3
4#[derive(Clone, Debug)]
5pub struct SqsQueue {
6    pub name: String,
7    pub url: String,
8    pub queue_type: String,
9    pub created_timestamp: String,
10    pub messages_available: String,
11    pub messages_in_flight: String,
12    pub encryption: String,
13    pub content_based_deduplication: String,
14    pub last_modified_timestamp: String,
15    pub visibility_timeout: String,
16    pub message_retention_period: String,
17    pub maximum_message_size: String,
18    pub delivery_delay: String,
19    pub receive_message_wait_time: String,
20    pub high_throughput_fifo: String,
21    pub deduplication_scope: String,
22    pub fifo_throughput_limit: String,
23    pub dead_letter_queue: String,
24    pub messages_delayed: String,
25    pub redrive_allow_policy: String,
26    pub redrive_policy: String,
27    pub redrive_task_id: String,
28    pub redrive_task_start_time: String,
29    pub redrive_task_status: String,
30    pub redrive_task_percent: String,
31    pub redrive_task_destination: String,
32}
33
34#[derive(Clone, Debug)]
35pub struct LambdaTrigger {
36    pub uuid: String,
37    pub arn: String,
38    pub status: String,
39    pub last_modified: String,
40}
41
42#[derive(Clone, Debug)]
43pub struct EventBridgePipe {
44    pub name: String,
45    pub status: String,
46    pub target: String,
47    pub last_modified: String,
48}
49
50#[derive(Clone, Debug)]
51pub struct QueueTag {
52    pub key: String,
53    pub value: String,
54}
55
56pub struct SqsClient {
57    config: AwsConfig,
58}
59
60impl SqsClient {
61    pub fn new(config: AwsConfig) -> Self {
62        Self { config }
63    }
64
65    pub async fn list_lambda_triggers(&self, queue_arn: &str) -> Result<Vec<LambdaTrigger>> {
66        let lambda_client = self.config.lambda_client().await;
67
68        let response = lambda_client
69            .list_event_source_mappings()
70            .event_source_arn(queue_arn)
71            .send()
72            .await?;
73
74        let mut triggers = Vec::new();
75        if let Some(mappings) = response.event_source_mappings {
76            for mapping in mappings {
77                triggers.push(LambdaTrigger {
78                    uuid: mapping.uuid.unwrap_or_default(),
79                    arn: mapping.function_arn.unwrap_or_default(),
80                    status: mapping
81                        .state
82                        .map(|s| s.as_str().to_string())
83                        .unwrap_or_else(|| "Unknown".to_string()),
84                    last_modified: mapping
85                        .last_modified
86                        .map(|dt| dt.secs().to_string())
87                        .unwrap_or_default(),
88                });
89            }
90        }
91
92        Ok(triggers)
93    }
94
95    pub async fn list_queues(&self, prefix: &str) -> Result<Vec<SqsQueue>> {
96        let client = self.config.sqs_client().await;
97
98        let mut request = client.list_queues();
99        if !prefix.is_empty() {
100            request = request.queue_name_prefix(prefix);
101        }
102
103        let response = request.send().await?;
104        let mut queues = Vec::new();
105
106        if let Some(urls) = response.queue_urls {
107            for url in urls {
108                let attrs_response = client
109                    .get_queue_attributes()
110                    .queue_url(&url)
111                    .attribute_names(aws_sdk_sqs::types::QueueAttributeName::All)
112                    .send()
113                    .await?;
114
115                let attrs = attrs_response.attributes.unwrap_or_default();
116
117                let name = url.split('/').next_back().unwrap_or(&url).to_string();
118                let queue_type = if name.ends_with(".fifo") {
119                    "FIFO".to_string()
120                } else {
121                    "Standard".to_string()
122                };
123
124                queues.push(SqsQueue {
125                    name,
126                    url: url.clone(),
127                    queue_type,
128                    created_timestamp: attrs
129                        .get(&aws_sdk_sqs::types::QueueAttributeName::CreatedTimestamp)
130                        .map(|v| v.to_string())
131                        .unwrap_or_default(),
132                    messages_available: attrs
133                        .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
134                        .map(|v| v.to_string())
135                        .unwrap_or_else(|| "0".to_string()),
136                    messages_in_flight: attrs
137                        .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible)
138                        .map(|v| v.to_string())
139                        .unwrap_or_else(|| "0".to_string()),
140                    encryption: if let Some(kms_key) = attrs.get(&aws_sdk_sqs::types::QueueAttributeName::KmsMasterKeyId) {
141                        if kms_key.is_empty() {
142                            "SSE-SQS".to_string()
143                        } else {
144                            "SSE-KMS".to_string()
145                        }
146                    } else if attrs.contains_key(&aws_sdk_sqs::types::QueueAttributeName::SqsManagedSseEnabled) {
147                        "SSE-SQS".to_string()
148                    } else {
149                        "-".to_string()
150                    },
151                    content_based_deduplication: attrs
152                        .get(&aws_sdk_sqs::types::QueueAttributeName::ContentBasedDeduplication)
153                        .map(|v| if v == "true" { "Enabled" } else { "Disabled" })
154                        .unwrap_or("Disabled")
155                        .to_string(),
156                    last_modified_timestamp: attrs
157                        .get(&aws_sdk_sqs::types::QueueAttributeName::LastModifiedTimestamp)
158                        .map(|v| v.to_string())
159                        .unwrap_or_default(),
160                    visibility_timeout: attrs
161                        .get(&aws_sdk_sqs::types::QueueAttributeName::VisibilityTimeout)
162                        .map(|v| v.to_string())
163                        .unwrap_or_default(),
164                    message_retention_period: attrs
165                        .get(&aws_sdk_sqs::types::QueueAttributeName::MessageRetentionPeriod)
166                        .map(|v| v.to_string())
167                        .unwrap_or_default(),
168                    maximum_message_size: attrs
169                        .get(&aws_sdk_sqs::types::QueueAttributeName::MaximumMessageSize)
170                        .map(|v| format!("{} bytes", v))
171                        .unwrap_or_default(),
172                    delivery_delay: attrs
173                        .get(&aws_sdk_sqs::types::QueueAttributeName::DelaySeconds)
174                        .map(|v| v.to_string())
175                        .unwrap_or_default(),
176                    receive_message_wait_time: attrs
177                        .get(&aws_sdk_sqs::types::QueueAttributeName::ReceiveMessageWaitTimeSeconds)
178                        .map(|v| v.to_string())
179                        .unwrap_or_default(),
180                    high_throughput_fifo: attrs
181                        .get(&aws_sdk_sqs::types::QueueAttributeName::FifoThroughputLimit)
182                        .map(|v| if v == "perMessageGroupId" { "Disabled" } else { "Enabled" })
183                        .unwrap_or("-")
184                        .to_string(),
185                    deduplication_scope: attrs
186                        .get(&aws_sdk_sqs::types::QueueAttributeName::DeduplicationScope)
187                        .map(|v| v.to_string())
188                        .unwrap_or_else(|| "-".to_string()),
189                    fifo_throughput_limit: attrs
190                        .get(&aws_sdk_sqs::types::QueueAttributeName::FifoThroughputLimit)
191                        .map(|v| v.to_string())
192                        .unwrap_or_else(|| "-".to_string()),
193                    dead_letter_queue: attrs
194                        .get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
195                        .and_then(|policy| {
196                            // Parse JSON to extract deadLetterTargetArn
197                            serde_json::from_str::<serde_json::Value>(policy)
198                                .ok()
199                                .and_then(|v| v.get("deadLetterTargetArn").and_then(|arn| arn.as_str()).map(|s| {
200                                    // Extract queue name from ARN
201                                    s.split(':').next_back().unwrap_or(s).to_string()
202                                }))
203                        })
204                        .unwrap_or_else(|| "-".to_string()),
205                    messages_delayed: attrs
206                        .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesDelayed)
207                        .map(|v| v.to_string())
208                        .unwrap_or_else(|| "0".to_string()),
209                    redrive_allow_policy: attrs
210                        .get(&aws_sdk_sqs::types::QueueAttributeName::RedriveAllowPolicy)
211                        .map(|policy| {
212                            // Parse JSON to extract redrivePermission
213                            serde_json::from_str::<serde_json::Value>(policy)
214                                .ok()
215                                .and_then(|v| v.get("redrivePermission").and_then(|p| p.as_str()).map(String::from))
216                                .unwrap_or_else(|| policy.to_string())
217                        })
218                        .unwrap_or_else(|| "-".to_string()),
219                    redrive_policy: attrs
220                        .get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
221                        .map(|v| v.to_string())
222                        .unwrap_or_else(|| "".to_string()),
223                    // TODO: Fetch actual redrive task data using ListMessageMoveTasksCommand
224                    redrive_task_id: "-".to_string(),
225                    redrive_task_start_time: "-".to_string(),
226                    redrive_task_status: "-".to_string(),
227                    redrive_task_percent: "-".to_string(),
228                    redrive_task_destination: "-".to_string(),
229                });
230            }
231        }
232
233        queues.sort_by(|a, b| a.name.cmp(&b.name));
234        Ok(queues)
235    }
236
237    pub async fn list_message_move_tasks(
238        &self,
239        queue_url: &str,
240    ) -> Result<(String, String, String, String, String)> {
241        let client = self.config.sqs_client().await;
242
243        match client
244            .list_message_move_tasks()
245            .source_arn(queue_url)
246            .send()
247            .await
248        {
249            Ok(response) => {
250                let results = response.results();
251                if let Some(task) = results.first() {
252                    let task_id = task.task_handle().unwrap_or("-").to_string();
253
254                    let start_time = chrono::DateTime::from_timestamp(task.started_timestamp(), 0)
255                        .map(|dt| format!("{} (UTC)", dt.format("%Y-%m-%d %H:%M:%S")))
256                        .unwrap_or_else(|| "-".to_string());
257
258                    let status = format!("{:?}", task.status());
259
260                    let moved = task.approximate_number_of_messages_moved();
261                    let total = task.approximate_number_of_messages_to_move().unwrap_or(0);
262                    let percent = if total > 0 {
263                        format!("{}%", (moved * 100) / total)
264                    } else {
265                        "-".to_string()
266                    };
267
268                    let destination = task.destination_arn().unwrap_or("-").to_string();
269
270                    return Ok((task_id, start_time, status, percent, destination));
271                }
272            }
273            Err(_) => {
274                // No active redrive tasks or error fetching
275            }
276        }
277
278        Ok((
279            "-".to_string(),
280            "-".to_string(),
281            "-".to_string(),
282            "-".to_string(),
283            "-".to_string(),
284        ))
285    }
286
287    pub async fn get_queue_arn(&self, queue_url: &str) -> Result<String> {
288        let client = self.config.sqs_client().await;
289
290        let response = client
291            .get_queue_attributes()
292            .queue_url(queue_url)
293            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
294            .send()
295            .await?;
296
297        let arn = response
298            .attributes()
299            .and_then(|attrs| attrs.get(&aws_sdk_sqs::types::QueueAttributeName::QueueArn))
300            .map(|v| v.to_string())
301            .unwrap_or_default();
302
303        Ok(arn)
304    }
305
306    pub async fn list_pipes(&self, queue_arn: &str) -> Result<Vec<EventBridgePipe>> {
307        let pipes_client = self.config.pipes_client().await;
308
309        let response = pipes_client
310            .list_pipes()
311            .source_prefix(queue_arn)
312            .send()
313            .await?;
314
315        let mut pipes = Vec::new();
316        if let Some(pipe_list) = response.pipes {
317            for pipe in pipe_list {
318                pipes.push(EventBridgePipe {
319                    name: pipe.name.unwrap_or_default(),
320                    status: pipe
321                        .current_state
322                        .map(|s| s.as_str().to_string())
323                        .unwrap_or_default(),
324                    target: pipe.target.unwrap_or_default(),
325                    last_modified: pipe
326                        .last_modified_time
327                        .map(|dt| dt.secs().to_string())
328                        .unwrap_or_default(),
329                });
330            }
331        }
332
333        Ok(pipes)
334    }
335
336    pub async fn list_tags(&self, queue_arn: &str) -> Result<Vec<QueueTag>> {
337        let client = self.config.sqs_client().await;
338
339        let response = client.list_queue_tags().queue_url(queue_arn).send().await?;
340
341        let mut tags = Vec::new();
342        if let Some(tag_map) = response.tags {
343            for (key, value) in tag_map {
344                tags.push(QueueTag { key, value });
345            }
346        }
347
348        Ok(tags)
349    }
350
351    pub async fn get_queue_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
352        let cw_client = self.config.cloudwatch_client().await;
353
354        let end_time = aws_smithy_types::DateTime::from_secs(
355            std::time::SystemTime::now()
356                .duration_since(std::time::UNIX_EPOCH)?
357                .as_secs() as i64,
358        );
359        let start_time = aws_smithy_types::DateTime::from_secs(
360            std::time::SystemTime::now()
361                .duration_since(std::time::UNIX_EPOCH)?
362                .as_secs() as i64
363                - 3 * 3600, // 3 hours ago
364        );
365
366        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
367            .name("QueueName")
368            .value(queue_name)
369            .build();
370
371        let response = cw_client
372            .get_metric_statistics()
373            .namespace("AWS/SQS")
374            .metric_name("ApproximateAgeOfOldestMessage")
375            .dimensions(dimension)
376            .start_time(start_time)
377            .end_time(end_time)
378            .period(60)
379            .statistics(aws_sdk_cloudwatch::types::Statistic::Maximum)
380            .send()
381            .await?;
382
383        let mut data = Vec::new();
384        if let Some(datapoints) = response.datapoints {
385            for dp in datapoints {
386                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.maximum) {
387                    data.push((timestamp.secs(), value));
388                }
389            }
390        }
391
392        // Sort by timestamp
393        data.sort_by_key(|(ts, _)| *ts);
394
395        Ok(data)
396    }
397
398    pub async fn get_queue_delayed_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
399        let cw_client = self.config.cloudwatch_client().await;
400
401        let end_time = aws_smithy_types::DateTime::from_secs(
402            std::time::SystemTime::now()
403                .duration_since(std::time::UNIX_EPOCH)?
404                .as_secs() as i64,
405        );
406        let start_time = aws_smithy_types::DateTime::from_secs(
407            std::time::SystemTime::now()
408                .duration_since(std::time::UNIX_EPOCH)?
409                .as_secs() as i64
410                - 3 * 3600, // 3 hours ago
411        );
412
413        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
414            .name("QueueName")
415            .value(queue_name)
416            .build();
417
418        let response = cw_client
419            .get_metric_statistics()
420            .namespace("AWS/SQS")
421            .metric_name("ApproximateNumberOfMessagesDelayed")
422            .dimensions(dimension)
423            .start_time(start_time)
424            .end_time(end_time)
425            .period(60)
426            .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
427            .send()
428            .await?;
429
430        let mut data = Vec::new();
431        if let Some(datapoints) = response.datapoints {
432            for dp in datapoints {
433                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
434                    data.push((timestamp.secs(), value));
435                }
436            }
437        }
438
439        // Sort by timestamp
440        data.sort_by_key(|(ts, _)| *ts);
441
442        Ok(data)
443    }
444
445    pub async fn get_queue_not_visible_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
446        let cw_client = self.config.cloudwatch_client().await;
447
448        let end_time = aws_smithy_types::DateTime::from_secs(
449            std::time::SystemTime::now()
450                .duration_since(std::time::UNIX_EPOCH)?
451                .as_secs() as i64,
452        );
453        let start_time = aws_smithy_types::DateTime::from_secs(
454            std::time::SystemTime::now()
455                .duration_since(std::time::UNIX_EPOCH)?
456                .as_secs() as i64
457                - 3 * 3600, // 3 hours ago
458        );
459
460        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
461            .name("QueueName")
462            .value(queue_name)
463            .build();
464
465        let response = cw_client
466            .get_metric_statistics()
467            .namespace("AWS/SQS")
468            .metric_name("ApproximateNumberOfMessagesNotVisible")
469            .dimensions(dimension)
470            .start_time(start_time)
471            .end_time(end_time)
472            .period(60)
473            .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
474            .send()
475            .await?;
476
477        let mut data = Vec::new();
478        if let Some(datapoints) = response.datapoints {
479            for dp in datapoints {
480                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
481                    data.push((timestamp.secs(), value));
482                }
483            }
484        }
485
486        // Sort by timestamp
487        data.sort_by_key(|(ts, _)| *ts);
488
489        Ok(data)
490    }
491
492    pub async fn get_queue_visible_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
493        let cw_client = self.config.cloudwatch_client().await;
494
495        let end_time = aws_smithy_types::DateTime::from_secs(
496            std::time::SystemTime::now()
497                .duration_since(std::time::UNIX_EPOCH)?
498                .as_secs() as i64,
499        );
500        let start_time = aws_smithy_types::DateTime::from_secs(
501            std::time::SystemTime::now()
502                .duration_since(std::time::UNIX_EPOCH)?
503                .as_secs() as i64
504                - 3 * 3600,
505        );
506
507        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
508            .name("QueueName")
509            .value(queue_name)
510            .build();
511
512        let response = cw_client
513            .get_metric_statistics()
514            .namespace("AWS/SQS")
515            .metric_name("ApproximateNumberOfMessagesVisible")
516            .dimensions(dimension)
517            .start_time(start_time)
518            .end_time(end_time)
519            .period(60)
520            .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
521            .send()
522            .await?;
523
524        let mut data = Vec::new();
525        if let Some(datapoints) = response.datapoints {
526            for dp in datapoints {
527                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
528                    data.push((timestamp.secs(), value));
529                }
530            }
531        }
532
533        data.sort_by_key(|(ts, _)| *ts);
534
535        Ok(data)
536    }
537
538    pub async fn get_queue_empty_receives_metrics(
539        &self,
540        queue_name: &str,
541    ) -> Result<Vec<(i64, f64)>> {
542        let cw_client = self.config.cloudwatch_client().await;
543
544        let end_time = aws_smithy_types::DateTime::from_secs(
545            std::time::SystemTime::now()
546                .duration_since(std::time::UNIX_EPOCH)?
547                .as_secs() as i64,
548        );
549        let start_time = aws_smithy_types::DateTime::from_secs(
550            std::time::SystemTime::now()
551                .duration_since(std::time::UNIX_EPOCH)?
552                .as_secs() as i64
553                - 3 * 3600,
554        );
555
556        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
557            .name("QueueName")
558            .value(queue_name)
559            .build();
560
561        let response = cw_client
562            .get_metric_statistics()
563            .namespace("AWS/SQS")
564            .metric_name("NumberOfEmptyReceives")
565            .dimensions(dimension)
566            .start_time(start_time)
567            .end_time(end_time)
568            .period(60)
569            .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
570            .send()
571            .await?;
572
573        let mut data = Vec::new();
574        if let Some(datapoints) = response.datapoints {
575            for dp in datapoints {
576                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
577                    data.push((timestamp.secs(), value));
578                }
579            }
580        }
581
582        data.sort_by_key(|(ts, _)| *ts);
583
584        Ok(data)
585    }
586
587    pub async fn get_queue_messages_deleted_metrics(
588        &self,
589        queue_name: &str,
590    ) -> Result<Vec<(i64, f64)>> {
591        let cw_client = self.config.cloudwatch_client().await;
592
593        let end_time = aws_smithy_types::DateTime::from_secs(
594            std::time::SystemTime::now()
595                .duration_since(std::time::UNIX_EPOCH)?
596                .as_secs() as i64,
597        );
598        let start_time = aws_smithy_types::DateTime::from_secs(
599            std::time::SystemTime::now()
600                .duration_since(std::time::UNIX_EPOCH)?
601                .as_secs() as i64
602                - 3 * 3600,
603        );
604
605        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
606            .name("QueueName")
607            .value(queue_name)
608            .build();
609
610        let response = cw_client
611            .get_metric_statistics()
612            .namespace("AWS/SQS")
613            .metric_name("NumberOfMessagesDeleted")
614            .dimensions(dimension)
615            .start_time(start_time)
616            .end_time(end_time)
617            .period(60)
618            .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
619            .send()
620            .await?;
621
622        let mut data = Vec::new();
623        if let Some(datapoints) = response.datapoints {
624            for dp in datapoints {
625                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
626                    data.push((timestamp.secs(), value));
627                }
628            }
629        }
630
631        data.sort_by_key(|(ts, _)| *ts);
632
633        Ok(data)
634    }
635
636    pub async fn get_queue_messages_received_metrics(
637        &self,
638        queue_name: &str,
639    ) -> Result<Vec<(i64, f64)>> {
640        let cw_client = self.config.cloudwatch_client().await;
641
642        let end_time = aws_smithy_types::DateTime::from_secs(
643            std::time::SystemTime::now()
644                .duration_since(std::time::UNIX_EPOCH)?
645                .as_secs() as i64,
646        );
647        let start_time = aws_smithy_types::DateTime::from_secs(
648            std::time::SystemTime::now()
649                .duration_since(std::time::UNIX_EPOCH)?
650                .as_secs() as i64
651                - 3 * 3600,
652        );
653
654        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
655            .name("QueueName")
656            .value(queue_name)
657            .build();
658
659        let response = cw_client
660            .get_metric_statistics()
661            .namespace("AWS/SQS")
662            .metric_name("NumberOfMessagesReceived")
663            .dimensions(dimension)
664            .start_time(start_time)
665            .end_time(end_time)
666            .period(60)
667            .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
668            .send()
669            .await?;
670
671        let mut data = Vec::new();
672        if let Some(datapoints) = response.datapoints {
673            for dp in datapoints {
674                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
675                    data.push((timestamp.secs(), value));
676                }
677            }
678        }
679
680        data.sort_by_key(|(ts, _)| *ts);
681
682        Ok(data)
683    }
684
685    pub async fn get_queue_messages_sent_metrics(
686        &self,
687        queue_name: &str,
688    ) -> Result<Vec<(i64, f64)>> {
689        let cw_client = self.config.cloudwatch_client().await;
690
691        let end_time = aws_smithy_types::DateTime::from_secs(
692            std::time::SystemTime::now()
693                .duration_since(std::time::UNIX_EPOCH)?
694                .as_secs() as i64,
695        );
696        let start_time = aws_smithy_types::DateTime::from_secs(
697            std::time::SystemTime::now()
698                .duration_since(std::time::UNIX_EPOCH)?
699                .as_secs() as i64
700                - 3 * 3600,
701        );
702
703        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
704            .name("QueueName")
705            .value(queue_name)
706            .build();
707
708        let response = cw_client
709            .get_metric_statistics()
710            .namespace("AWS/SQS")
711            .metric_name("NumberOfMessagesSent")
712            .dimensions(dimension)
713            .start_time(start_time)
714            .end_time(end_time)
715            .period(60)
716            .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
717            .send()
718            .await?;
719
720        let mut data = Vec::new();
721        if let Some(datapoints) = response.datapoints {
722            for dp in datapoints {
723                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
724                    data.push((timestamp.secs(), value));
725                }
726            }
727        }
728
729        data.sort_by_key(|(ts, _)| *ts);
730
731        Ok(data)
732    }
733
734    pub async fn get_queue_sent_message_size_metrics(
735        &self,
736        queue_name: &str,
737    ) -> Result<Vec<(i64, f64)>> {
738        let cw_client = self.config.cloudwatch_client().await;
739
740        let end_time = aws_smithy_types::DateTime::from_secs(
741            std::time::SystemTime::now()
742                .duration_since(std::time::UNIX_EPOCH)?
743                .as_secs() as i64,
744        );
745        let start_time = aws_smithy_types::DateTime::from_secs(
746            std::time::SystemTime::now()
747                .duration_since(std::time::UNIX_EPOCH)?
748                .as_secs() as i64
749                - 3 * 3600,
750        );
751
752        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
753            .name("QueueName")
754            .value(queue_name)
755            .build();
756
757        let response = cw_client
758            .get_metric_statistics()
759            .namespace("AWS/SQS")
760            .metric_name("SentMessageSize")
761            .dimensions(dimension)
762            .start_time(start_time)
763            .end_time(end_time)
764            .period(60)
765            .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
766            .send()
767            .await?;
768
769        let mut data = Vec::new();
770        if let Some(datapoints) = response.datapoints {
771            for dp in datapoints {
772                if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
773                    data.push((timestamp.secs(), value));
774                }
775            }
776        }
777
778        data.sort_by_key(|(ts, _)| *ts);
779
780        Ok(data)
781    }
782}