1use crate::config::AwsConfig;
2use anyhow::Result;
3
4#[derive(Clone, Debug)]
5pub struct SqsQueue {
6 pub name: String,
7 pub url: String,
8 pub queue_type: String,
9 pub created_timestamp: String,
10 pub messages_available: String,
11 pub messages_in_flight: String,
12 pub encryption: String,
13 pub content_based_deduplication: String,
14 pub last_modified_timestamp: String,
15 pub visibility_timeout: String,
16 pub message_retention_period: String,
17 pub maximum_message_size: String,
18 pub delivery_delay: String,
19 pub receive_message_wait_time: String,
20 pub high_throughput_fifo: String,
21 pub deduplication_scope: String,
22 pub fifo_throughput_limit: String,
23 pub dead_letter_queue: String,
24 pub messages_delayed: String,
25 pub redrive_allow_policy: String,
26 pub redrive_policy: String,
27 pub redrive_task_id: String,
28 pub redrive_task_start_time: String,
29 pub redrive_task_status: String,
30 pub redrive_task_percent: String,
31 pub redrive_task_destination: String,
32}
33
34#[derive(Clone, Debug)]
35pub struct LambdaTrigger {
36 pub uuid: String,
37 pub arn: String,
38 pub status: String,
39 pub last_modified: String,
40}
41
42#[derive(Clone, Debug)]
43pub struct EventBridgePipe {
44 pub name: String,
45 pub status: String,
46 pub target: String,
47 pub last_modified: String,
48}
49
50#[derive(Clone, Debug)]
51pub struct QueueTag {
52 pub key: String,
53 pub value: String,
54}
55
56pub struct SqsClient {
57 config: AwsConfig,
58}
59
60impl SqsClient {
61 pub fn new(config: AwsConfig) -> Self {
62 Self { config }
63 }
64
65 pub async fn list_lambda_triggers(&self, queue_arn: &str) -> Result<Vec<LambdaTrigger>> {
66 let lambda_client = self.config.lambda_client().await;
67
68 let response = lambda_client
69 .list_event_source_mappings()
70 .event_source_arn(queue_arn)
71 .send()
72 .await?;
73
74 let mut triggers = Vec::new();
75 if let Some(mappings) = response.event_source_mappings {
76 for mapping in mappings {
77 triggers.push(LambdaTrigger {
78 uuid: mapping.uuid.unwrap_or_default(),
79 arn: mapping.function_arn.unwrap_or_default(),
80 status: mapping
81 .state
82 .map(|s| s.as_str().to_string())
83 .unwrap_or_else(|| "Unknown".to_string()),
84 last_modified: mapping
85 .last_modified
86 .map(|dt| dt.secs().to_string())
87 .unwrap_or_default(),
88 });
89 }
90 }
91
92 Ok(triggers)
93 }
94
95 pub async fn list_queues(&self, prefix: &str) -> Result<Vec<SqsQueue>> {
96 let client = self.config.sqs_client().await;
97
98 let mut request = client.list_queues();
99 if !prefix.is_empty() {
100 request = request.queue_name_prefix(prefix);
101 }
102
103 let response = request.send().await?;
104 let mut queues = Vec::new();
105
106 if let Some(urls) = response.queue_urls {
107 for url in urls {
108 let attrs_response = client
109 .get_queue_attributes()
110 .queue_url(&url)
111 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::All)
112 .send()
113 .await?;
114
115 let attrs = attrs_response.attributes.unwrap_or_default();
116
117 let name = url.split('/').next_back().unwrap_or(&url).to_string();
118 let queue_type = if name.ends_with(".fifo") {
119 "FIFO".to_string()
120 } else {
121 "Standard".to_string()
122 };
123
124 queues.push(SqsQueue {
125 name,
126 url: url.clone(),
127 queue_type,
128 created_timestamp: attrs
129 .get(&aws_sdk_sqs::types::QueueAttributeName::CreatedTimestamp)
130 .map(|v| v.to_string())
131 .unwrap_or_default(),
132 messages_available: attrs
133 .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
134 .map(|v| v.to_string())
135 .unwrap_or_else(|| "0".to_string()),
136 messages_in_flight: attrs
137 .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible)
138 .map(|v| v.to_string())
139 .unwrap_or_else(|| "0".to_string()),
140 encryption: if let Some(kms_key) = attrs.get(&aws_sdk_sqs::types::QueueAttributeName::KmsMasterKeyId) {
141 if kms_key.is_empty() {
142 "SSE-SQS".to_string()
143 } else {
144 "SSE-KMS".to_string()
145 }
146 } else if attrs.contains_key(&aws_sdk_sqs::types::QueueAttributeName::SqsManagedSseEnabled) {
147 "SSE-SQS".to_string()
148 } else {
149 "-".to_string()
150 },
151 content_based_deduplication: attrs
152 .get(&aws_sdk_sqs::types::QueueAttributeName::ContentBasedDeduplication)
153 .map(|v| if v == "true" { "Enabled" } else { "Disabled" })
154 .unwrap_or("Disabled")
155 .to_string(),
156 last_modified_timestamp: attrs
157 .get(&aws_sdk_sqs::types::QueueAttributeName::LastModifiedTimestamp)
158 .map(|v| v.to_string())
159 .unwrap_or_default(),
160 visibility_timeout: attrs
161 .get(&aws_sdk_sqs::types::QueueAttributeName::VisibilityTimeout)
162 .map(|v| v.to_string())
163 .unwrap_or_default(),
164 message_retention_period: attrs
165 .get(&aws_sdk_sqs::types::QueueAttributeName::MessageRetentionPeriod)
166 .map(|v| v.to_string())
167 .unwrap_or_default(),
168 maximum_message_size: attrs
169 .get(&aws_sdk_sqs::types::QueueAttributeName::MaximumMessageSize)
170 .map(|v| format!("{} bytes", v))
171 .unwrap_or_default(),
172 delivery_delay: attrs
173 .get(&aws_sdk_sqs::types::QueueAttributeName::DelaySeconds)
174 .map(|v| v.to_string())
175 .unwrap_or_default(),
176 receive_message_wait_time: attrs
177 .get(&aws_sdk_sqs::types::QueueAttributeName::ReceiveMessageWaitTimeSeconds)
178 .map(|v| v.to_string())
179 .unwrap_or_default(),
180 high_throughput_fifo: attrs
181 .get(&aws_sdk_sqs::types::QueueAttributeName::FifoThroughputLimit)
182 .map(|v| if v == "perMessageGroupId" { "Disabled" } else { "Enabled" })
183 .unwrap_or("-")
184 .to_string(),
185 deduplication_scope: attrs
186 .get(&aws_sdk_sqs::types::QueueAttributeName::DeduplicationScope)
187 .map(|v| v.to_string())
188 .unwrap_or_else(|| "-".to_string()),
189 fifo_throughput_limit: attrs
190 .get(&aws_sdk_sqs::types::QueueAttributeName::FifoThroughputLimit)
191 .map(|v| v.to_string())
192 .unwrap_or_else(|| "-".to_string()),
193 dead_letter_queue: attrs
194 .get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
195 .and_then(|policy| {
196 serde_json::from_str::<serde_json::Value>(policy)
198 .ok()
199 .and_then(|v| v.get("deadLetterTargetArn").and_then(|arn| arn.as_str()).map(|s| {
200 s.split(':').next_back().unwrap_or(s).to_string()
202 }))
203 })
204 .unwrap_or_else(|| "-".to_string()),
205 messages_delayed: attrs
206 .get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesDelayed)
207 .map(|v| v.to_string())
208 .unwrap_or_else(|| "0".to_string()),
209 redrive_allow_policy: attrs
210 .get(&aws_sdk_sqs::types::QueueAttributeName::RedriveAllowPolicy)
211 .map(|policy| {
212 serde_json::from_str::<serde_json::Value>(policy)
214 .ok()
215 .and_then(|v| v.get("redrivePermission").and_then(|p| p.as_str()).map(String::from))
216 .unwrap_or_else(|| policy.to_string())
217 })
218 .unwrap_or_else(|| "-".to_string()),
219 redrive_policy: attrs
220 .get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
221 .map(|v| v.to_string())
222 .unwrap_or_else(|| "".to_string()),
223 redrive_task_id: "-".to_string(),
225 redrive_task_start_time: "-".to_string(),
226 redrive_task_status: "-".to_string(),
227 redrive_task_percent: "-".to_string(),
228 redrive_task_destination: "-".to_string(),
229 });
230 }
231 }
232
233 queues.sort_by(|a, b| a.name.cmp(&b.name));
234 Ok(queues)
235 }
236
237 pub async fn list_message_move_tasks(
238 &self,
239 queue_url: &str,
240 ) -> Result<(String, String, String, String, String)> {
241 let client = self.config.sqs_client().await;
242
243 match client
244 .list_message_move_tasks()
245 .source_arn(queue_url)
246 .send()
247 .await
248 {
249 Ok(response) => {
250 let results = response.results();
251 if let Some(task) = results.first() {
252 let task_id = task.task_handle().unwrap_or("-").to_string();
253
254 let start_time = chrono::DateTime::from_timestamp(task.started_timestamp(), 0)
255 .map(|dt| format!("{} (UTC)", dt.format("%Y-%m-%d %H:%M:%S")))
256 .unwrap_or_else(|| "-".to_string());
257
258 let status = format!("{:?}", task.status());
259
260 let moved = task.approximate_number_of_messages_moved();
261 let total = task.approximate_number_of_messages_to_move().unwrap_or(0);
262 let percent = if total > 0 {
263 format!("{}%", (moved * 100) / total)
264 } else {
265 "-".to_string()
266 };
267
268 let destination = task.destination_arn().unwrap_or("-").to_string();
269
270 return Ok((task_id, start_time, status, percent, destination));
271 }
272 }
273 Err(_) => {
274 }
276 }
277
278 Ok((
279 "-".to_string(),
280 "-".to_string(),
281 "-".to_string(),
282 "-".to_string(),
283 "-".to_string(),
284 ))
285 }
286
287 pub async fn get_queue_arn(&self, queue_url: &str) -> Result<String> {
288 let client = self.config.sqs_client().await;
289
290 let response = client
291 .get_queue_attributes()
292 .queue_url(queue_url)
293 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
294 .send()
295 .await?;
296
297 let arn = response
298 .attributes()
299 .and_then(|attrs| attrs.get(&aws_sdk_sqs::types::QueueAttributeName::QueueArn))
300 .map(|v| v.to_string())
301 .unwrap_or_default();
302
303 Ok(arn)
304 }
305
306 pub async fn list_pipes(&self, queue_arn: &str) -> Result<Vec<EventBridgePipe>> {
307 let pipes_client = self.config.pipes_client().await;
308
309 let response = pipes_client
310 .list_pipes()
311 .source_prefix(queue_arn)
312 .send()
313 .await?;
314
315 let mut pipes = Vec::new();
316 if let Some(pipe_list) = response.pipes {
317 for pipe in pipe_list {
318 pipes.push(EventBridgePipe {
319 name: pipe.name.unwrap_or_default(),
320 status: pipe
321 .current_state
322 .map(|s| s.as_str().to_string())
323 .unwrap_or_default(),
324 target: pipe.target.unwrap_or_default(),
325 last_modified: pipe
326 .last_modified_time
327 .map(|dt| dt.secs().to_string())
328 .unwrap_or_default(),
329 });
330 }
331 }
332
333 Ok(pipes)
334 }
335
336 pub async fn list_tags(&self, queue_arn: &str) -> Result<Vec<QueueTag>> {
337 let client = self.config.sqs_client().await;
338
339 let response = client.list_queue_tags().queue_url(queue_arn).send().await?;
340
341 let mut tags = Vec::new();
342 if let Some(tag_map) = response.tags {
343 for (key, value) in tag_map {
344 tags.push(QueueTag { key, value });
345 }
346 }
347
348 Ok(tags)
349 }
350
351 pub async fn get_queue_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
352 let cw_client = self.config.cloudwatch_client().await;
353
354 let end_time = aws_smithy_types::DateTime::from_secs(
355 std::time::SystemTime::now()
356 .duration_since(std::time::UNIX_EPOCH)?
357 .as_secs() as i64,
358 );
359 let start_time = aws_smithy_types::DateTime::from_secs(
360 std::time::SystemTime::now()
361 .duration_since(std::time::UNIX_EPOCH)?
362 .as_secs() as i64
363 - 3 * 3600, );
365
366 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
367 .name("QueueName")
368 .value(queue_name)
369 .build();
370
371 let response = cw_client
372 .get_metric_statistics()
373 .namespace("AWS/SQS")
374 .metric_name("ApproximateAgeOfOldestMessage")
375 .dimensions(dimension)
376 .start_time(start_time)
377 .end_time(end_time)
378 .period(60)
379 .statistics(aws_sdk_cloudwatch::types::Statistic::Maximum)
380 .send()
381 .await?;
382
383 let mut data = Vec::new();
384 if let Some(datapoints) = response.datapoints {
385 for dp in datapoints {
386 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.maximum) {
387 data.push((timestamp.secs(), value));
388 }
389 }
390 }
391
392 data.sort_by_key(|(ts, _)| *ts);
394
395 Ok(data)
396 }
397
398 pub async fn get_queue_delayed_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
399 let cw_client = self.config.cloudwatch_client().await;
400
401 let end_time = aws_smithy_types::DateTime::from_secs(
402 std::time::SystemTime::now()
403 .duration_since(std::time::UNIX_EPOCH)?
404 .as_secs() as i64,
405 );
406 let start_time = aws_smithy_types::DateTime::from_secs(
407 std::time::SystemTime::now()
408 .duration_since(std::time::UNIX_EPOCH)?
409 .as_secs() as i64
410 - 3 * 3600, );
412
413 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
414 .name("QueueName")
415 .value(queue_name)
416 .build();
417
418 let response = cw_client
419 .get_metric_statistics()
420 .namespace("AWS/SQS")
421 .metric_name("ApproximateNumberOfMessagesDelayed")
422 .dimensions(dimension)
423 .start_time(start_time)
424 .end_time(end_time)
425 .period(60)
426 .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
427 .send()
428 .await?;
429
430 let mut data = Vec::new();
431 if let Some(datapoints) = response.datapoints {
432 for dp in datapoints {
433 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
434 data.push((timestamp.secs(), value));
435 }
436 }
437 }
438
439 data.sort_by_key(|(ts, _)| *ts);
441
442 Ok(data)
443 }
444
445 pub async fn get_queue_not_visible_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
446 let cw_client = self.config.cloudwatch_client().await;
447
448 let end_time = aws_smithy_types::DateTime::from_secs(
449 std::time::SystemTime::now()
450 .duration_since(std::time::UNIX_EPOCH)?
451 .as_secs() as i64,
452 );
453 let start_time = aws_smithy_types::DateTime::from_secs(
454 std::time::SystemTime::now()
455 .duration_since(std::time::UNIX_EPOCH)?
456 .as_secs() as i64
457 - 3 * 3600, );
459
460 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
461 .name("QueueName")
462 .value(queue_name)
463 .build();
464
465 let response = cw_client
466 .get_metric_statistics()
467 .namespace("AWS/SQS")
468 .metric_name("ApproximateNumberOfMessagesNotVisible")
469 .dimensions(dimension)
470 .start_time(start_time)
471 .end_time(end_time)
472 .period(60)
473 .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
474 .send()
475 .await?;
476
477 let mut data = Vec::new();
478 if let Some(datapoints) = response.datapoints {
479 for dp in datapoints {
480 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
481 data.push((timestamp.secs(), value));
482 }
483 }
484 }
485
486 data.sort_by_key(|(ts, _)| *ts);
488
489 Ok(data)
490 }
491
492 pub async fn get_queue_visible_metrics(&self, queue_name: &str) -> Result<Vec<(i64, f64)>> {
493 let cw_client = self.config.cloudwatch_client().await;
494
495 let end_time = aws_smithy_types::DateTime::from_secs(
496 std::time::SystemTime::now()
497 .duration_since(std::time::UNIX_EPOCH)?
498 .as_secs() as i64,
499 );
500 let start_time = aws_smithy_types::DateTime::from_secs(
501 std::time::SystemTime::now()
502 .duration_since(std::time::UNIX_EPOCH)?
503 .as_secs() as i64
504 - 3 * 3600,
505 );
506
507 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
508 .name("QueueName")
509 .value(queue_name)
510 .build();
511
512 let response = cw_client
513 .get_metric_statistics()
514 .namespace("AWS/SQS")
515 .metric_name("ApproximateNumberOfMessagesVisible")
516 .dimensions(dimension)
517 .start_time(start_time)
518 .end_time(end_time)
519 .period(60)
520 .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
521 .send()
522 .await?;
523
524 let mut data = Vec::new();
525 if let Some(datapoints) = response.datapoints {
526 for dp in datapoints {
527 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
528 data.push((timestamp.secs(), value));
529 }
530 }
531 }
532
533 data.sort_by_key(|(ts, _)| *ts);
534
535 Ok(data)
536 }
537
538 pub async fn get_queue_empty_receives_metrics(
539 &self,
540 queue_name: &str,
541 ) -> Result<Vec<(i64, f64)>> {
542 let cw_client = self.config.cloudwatch_client().await;
543
544 let end_time = aws_smithy_types::DateTime::from_secs(
545 std::time::SystemTime::now()
546 .duration_since(std::time::UNIX_EPOCH)?
547 .as_secs() as i64,
548 );
549 let start_time = aws_smithy_types::DateTime::from_secs(
550 std::time::SystemTime::now()
551 .duration_since(std::time::UNIX_EPOCH)?
552 .as_secs() as i64
553 - 3 * 3600,
554 );
555
556 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
557 .name("QueueName")
558 .value(queue_name)
559 .build();
560
561 let response = cw_client
562 .get_metric_statistics()
563 .namespace("AWS/SQS")
564 .metric_name("NumberOfEmptyReceives")
565 .dimensions(dimension)
566 .start_time(start_time)
567 .end_time(end_time)
568 .period(60)
569 .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
570 .send()
571 .await?;
572
573 let mut data = Vec::new();
574 if let Some(datapoints) = response.datapoints {
575 for dp in datapoints {
576 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
577 data.push((timestamp.secs(), value));
578 }
579 }
580 }
581
582 data.sort_by_key(|(ts, _)| *ts);
583
584 Ok(data)
585 }
586
587 pub async fn get_queue_messages_deleted_metrics(
588 &self,
589 queue_name: &str,
590 ) -> Result<Vec<(i64, f64)>> {
591 let cw_client = self.config.cloudwatch_client().await;
592
593 let end_time = aws_smithy_types::DateTime::from_secs(
594 std::time::SystemTime::now()
595 .duration_since(std::time::UNIX_EPOCH)?
596 .as_secs() as i64,
597 );
598 let start_time = aws_smithy_types::DateTime::from_secs(
599 std::time::SystemTime::now()
600 .duration_since(std::time::UNIX_EPOCH)?
601 .as_secs() as i64
602 - 3 * 3600,
603 );
604
605 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
606 .name("QueueName")
607 .value(queue_name)
608 .build();
609
610 let response = cw_client
611 .get_metric_statistics()
612 .namespace("AWS/SQS")
613 .metric_name("NumberOfMessagesDeleted")
614 .dimensions(dimension)
615 .start_time(start_time)
616 .end_time(end_time)
617 .period(60)
618 .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
619 .send()
620 .await?;
621
622 let mut data = Vec::new();
623 if let Some(datapoints) = response.datapoints {
624 for dp in datapoints {
625 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
626 data.push((timestamp.secs(), value));
627 }
628 }
629 }
630
631 data.sort_by_key(|(ts, _)| *ts);
632
633 Ok(data)
634 }
635
636 pub async fn get_queue_messages_received_metrics(
637 &self,
638 queue_name: &str,
639 ) -> Result<Vec<(i64, f64)>> {
640 let cw_client = self.config.cloudwatch_client().await;
641
642 let end_time = aws_smithy_types::DateTime::from_secs(
643 std::time::SystemTime::now()
644 .duration_since(std::time::UNIX_EPOCH)?
645 .as_secs() as i64,
646 );
647 let start_time = aws_smithy_types::DateTime::from_secs(
648 std::time::SystemTime::now()
649 .duration_since(std::time::UNIX_EPOCH)?
650 .as_secs() as i64
651 - 3 * 3600,
652 );
653
654 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
655 .name("QueueName")
656 .value(queue_name)
657 .build();
658
659 let response = cw_client
660 .get_metric_statistics()
661 .namespace("AWS/SQS")
662 .metric_name("NumberOfMessagesReceived")
663 .dimensions(dimension)
664 .start_time(start_time)
665 .end_time(end_time)
666 .period(60)
667 .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
668 .send()
669 .await?;
670
671 let mut data = Vec::new();
672 if let Some(datapoints) = response.datapoints {
673 for dp in datapoints {
674 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
675 data.push((timestamp.secs(), value));
676 }
677 }
678 }
679
680 data.sort_by_key(|(ts, _)| *ts);
681
682 Ok(data)
683 }
684
685 pub async fn get_queue_messages_sent_metrics(
686 &self,
687 queue_name: &str,
688 ) -> Result<Vec<(i64, f64)>> {
689 let cw_client = self.config.cloudwatch_client().await;
690
691 let end_time = aws_smithy_types::DateTime::from_secs(
692 std::time::SystemTime::now()
693 .duration_since(std::time::UNIX_EPOCH)?
694 .as_secs() as i64,
695 );
696 let start_time = aws_smithy_types::DateTime::from_secs(
697 std::time::SystemTime::now()
698 .duration_since(std::time::UNIX_EPOCH)?
699 .as_secs() as i64
700 - 3 * 3600,
701 );
702
703 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
704 .name("QueueName")
705 .value(queue_name)
706 .build();
707
708 let response = cw_client
709 .get_metric_statistics()
710 .namespace("AWS/SQS")
711 .metric_name("NumberOfMessagesSent")
712 .dimensions(dimension)
713 .start_time(start_time)
714 .end_time(end_time)
715 .period(60)
716 .statistics(aws_sdk_cloudwatch::types::Statistic::Sum)
717 .send()
718 .await?;
719
720 let mut data = Vec::new();
721 if let Some(datapoints) = response.datapoints {
722 for dp in datapoints {
723 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.sum) {
724 data.push((timestamp.secs(), value));
725 }
726 }
727 }
728
729 data.sort_by_key(|(ts, _)| *ts);
730
731 Ok(data)
732 }
733
734 pub async fn get_queue_sent_message_size_metrics(
735 &self,
736 queue_name: &str,
737 ) -> Result<Vec<(i64, f64)>> {
738 let cw_client = self.config.cloudwatch_client().await;
739
740 let end_time = aws_smithy_types::DateTime::from_secs(
741 std::time::SystemTime::now()
742 .duration_since(std::time::UNIX_EPOCH)?
743 .as_secs() as i64,
744 );
745 let start_time = aws_smithy_types::DateTime::from_secs(
746 std::time::SystemTime::now()
747 .duration_since(std::time::UNIX_EPOCH)?
748 .as_secs() as i64
749 - 3 * 3600,
750 );
751
752 let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
753 .name("QueueName")
754 .value(queue_name)
755 .build();
756
757 let response = cw_client
758 .get_metric_statistics()
759 .namespace("AWS/SQS")
760 .metric_name("SentMessageSize")
761 .dimensions(dimension)
762 .start_time(start_time)
763 .end_time(end_time)
764 .period(60)
765 .statistics(aws_sdk_cloudwatch::types::Statistic::Average)
766 .send()
767 .await?;
768
769 let mut data = Vec::new();
770 if let Some(datapoints) = response.datapoints {
771 for dp in datapoints {
772 if let (Some(timestamp), Some(value)) = (dp.timestamp, dp.average) {
773 data.push((timestamp.secs(), value));
774 }
775 }
776 }
777
778 data.sort_by_key(|(ts, _)| *ts);
779
780 Ok(data)
781 }
782}