use aws_sdk_sqs::operation::{
add_permission::AddPermissionInput,
change_message_visibility::ChangeMessageVisibilityInput,
change_message_visibility_batch::ChangeMessageVisibilityBatchInput,
delete_message::DeleteMessageInput,
delete_message_batch::DeleteMessageBatchInput,
delete_queue::DeleteQueueInput,
get_queue_attributes::GetQueueAttributesInput,
list_dead_letter_source_queues::ListDeadLetterSourceQueuesInput,
list_queue_tags::ListQueueTagsInput,
purge_queue::PurgeQueueInput,
receive_message::{ReceiveMessageInput, ReceiveMessageOutput},
remove_permission::RemovePermissionInput,
send_message::{SendMessageInput, SendMessageOutput},
send_message_batch::{SendMessageBatchInput, SendMessageBatchOutput},
set_queue_attributes::SetQueueAttributesInput,
tag_queue::TagQueueInput,
untag_queue::UntagQueueInput,
};
use aws_smithy_runtime_api::client::interceptors::context;
use opentelemetry_semantic_conventions::attribute as semco;
use super::super::{AttributeExtractor, SpanWrite};
const MESSAGING_SYSTEM_VALUE: &str = "aws_sqs";
#[derive(Debug, Default)]
pub struct SQSExtractor {
_private: (),
}
impl SQSExtractor {
pub fn new() -> Self {
Self { _private: () }
}
}
impl<SW: SpanWrite> AttributeExtractor<SW> for SQSExtractor {
fn extract_input(
&self,
_service: crate::interceptor::Service,
operation: crate::interceptor::Operation,
input: &context::Input,
span: &mut SW,
) {
span.set_attribute(semco::MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE);
span.set_attribute(semco::MESSAGING_OPERATION_NAME, operation.to_owned());
if let Some(op_type) = operation_type(operation) {
span.set_attribute(semco::MESSAGING_OPERATION_TYPE, op_type);
}
match operation {
"SendMessage" => set_queue_url_attrs(
span,
input
.downcast_ref::<SendMessageInput>()
.expect("correct type")
.queue_url(),
),
"SendMessageBatch" => set_queue_url_attrs(
span,
input
.downcast_ref::<SendMessageBatchInput>()
.expect("correct type")
.queue_url(),
),
"ReceiveMessage" => set_queue_url_attrs(
span,
input
.downcast_ref::<ReceiveMessageInput>()
.expect("correct type")
.queue_url(),
),
"DeleteMessage" => set_queue_url_attrs(
span,
input
.downcast_ref::<DeleteMessageInput>()
.expect("correct type")
.queue_url(),
),
"DeleteMessageBatch" => set_queue_url_attrs(
span,
input
.downcast_ref::<DeleteMessageBatchInput>()
.expect("correct type")
.queue_url(),
),
"ChangeMessageVisibility" => set_queue_url_attrs(
span,
input
.downcast_ref::<ChangeMessageVisibilityInput>()
.expect("correct type")
.queue_url(),
),
"ChangeMessageVisibilityBatch" => set_queue_url_attrs(
span,
input
.downcast_ref::<ChangeMessageVisibilityBatchInput>()
.expect("correct type")
.queue_url(),
),
"GetQueueAttributes" => set_queue_url_attrs(
span,
input
.downcast_ref::<GetQueueAttributesInput>()
.expect("correct type")
.queue_url(),
),
"SetQueueAttributes" => set_queue_url_attrs(
span,
input
.downcast_ref::<SetQueueAttributesInput>()
.expect("correct type")
.queue_url(),
),
"DeleteQueue" => set_queue_url_attrs(
span,
input
.downcast_ref::<DeleteQueueInput>()
.expect("correct type")
.queue_url(),
),
"PurgeQueue" => set_queue_url_attrs(
span,
input
.downcast_ref::<PurgeQueueInput>()
.expect("correct type")
.queue_url(),
),
"ListDeadLetterSourceQueues" => set_queue_url_attrs(
span,
input
.downcast_ref::<ListDeadLetterSourceQueuesInput>()
.expect("correct type")
.queue_url(),
),
"ListQueueTags" => set_queue_url_attrs(
span,
input
.downcast_ref::<ListQueueTagsInput>()
.expect("correct type")
.queue_url(),
),
"TagQueue" => set_queue_url_attrs(
span,
input
.downcast_ref::<TagQueueInput>()
.expect("correct type")
.queue_url(),
),
"UntagQueue" => set_queue_url_attrs(
span,
input
.downcast_ref::<UntagQueueInput>()
.expect("correct type")
.queue_url(),
),
"AddPermission" => set_queue_url_attrs(
span,
input
.downcast_ref::<AddPermissionInput>()
.expect("correct type")
.queue_url(),
),
"RemovePermission" => set_queue_url_attrs(
span,
input
.downcast_ref::<RemovePermissionInput>()
.expect("correct type")
.queue_url(),
),
_ => {}
};
}
fn extract_output(
&self,
_service: crate::interceptor::Service,
operation: crate::interceptor::Operation,
output: &context::Output,
span: &mut SW,
) {
match operation {
"SendMessage" => {
if let Some(message_id) = output
.downcast_ref::<SendMessageOutput>()
.expect("correct type")
.message_id()
{
span.set_attribute(semco::MESSAGING_MESSAGE_ID, message_id.to_owned());
}
}
"SendMessageBatch" => {
if let Some(output) = output.downcast_ref::<SendMessageBatchOutput>() {
let count = output.successful().len();
span.set_attribute(semco::MESSAGING_BATCH_MESSAGE_COUNT, count as i64);
}
}
"ReceiveMessage" => {
if let Some(output) = output.downcast_ref::<ReceiveMessageOutput>() {
let count = output.messages().len();
span.set_attribute(semco::MESSAGING_BATCH_MESSAGE_COUNT, count as i64);
}
}
_ => {}
}
}
}
fn operation_type(operation: &str) -> Option<&'static str> {
match operation {
"SendMessage" | "SendMessageBatch" => Some("send"),
"ReceiveMessage" => Some("receive"),
"DeleteMessage"
| "DeleteMessageBatch"
| "ChangeMessageVisibility"
| "ChangeMessageVisibilityBatch" => Some("settle"),
_ => None,
}
}
fn set_queue_url_attrs(span: &mut impl SpanWrite, queue_url: Option<&str>) {
if let Some(url) = queue_url {
span.set_attribute(semco::AWS_SQS_QUEUE_URL, url.to_owned());
if let Some(queue_name) = url.rsplit('/').next().filter(|s| !s.is_empty()) {
span.set_attribute(semco::MESSAGING_DESTINATION_NAME, queue_name.to_owned());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::Value;
use opentelemetry_semantic_conventions::attribute as semco;
use crate::span_write::{SpanWrite, Status};
struct TestSpan {
attributes: Vec<(&'static str, Value)>,
status: Option<Status>,
}
impl TestSpan {
fn new() -> Self {
Self {
attributes: vec![],
status: None,
}
}
fn get(&self, key: &str) -> Option<&Value> {
self.attributes
.iter()
.rev()
.find(|(k, _)| *k == key)
.map(|(_, v)| v)
}
}
impl SpanWrite for TestSpan {
fn set_attribute(&mut self, key: &'static str, value: impl Into<Value>) {
self.attributes.push((key, value.into()));
}
fn set_status(&mut self, code: Status) {
self.status = Some(code);
}
}
#[test]
fn operation_type_known_mappings() {
assert_eq!(operation_type("SendMessage"), Some("send"));
assert_eq!(operation_type("SendMessageBatch"), Some("send"));
assert_eq!(operation_type("ReceiveMessage"), Some("receive"));
assert_eq!(operation_type("DeleteMessage"), Some("settle"));
assert_eq!(operation_type("DeleteMessageBatch"), Some("settle"));
assert_eq!(operation_type("ChangeMessageVisibility"), Some("settle"));
assert_eq!(
operation_type("ChangeMessageVisibilityBatch"),
Some("settle")
);
}
#[test]
fn operation_type_no_mapping() {
assert_eq!(operation_type("CreateQueue"), None);
assert_eq!(operation_type("GetQueueUrl"), None);
assert_eq!(operation_type("ListQueues"), None);
assert_eq!(operation_type("PurgeQueue"), None);
assert_eq!(operation_type("UnknownOp"), None);
}
#[test]
fn extract_output_valid_outputs() {
use aws_sdk_sqs::operation::{
receive_message::ReceiveMessageOutput, send_message::SendMessageOutput,
send_message_batch::SendMessageBatchOutput,
};
use aws_sdk_sqs::types::{Message, SendMessageBatchResultEntry};
use aws_smithy_runtime_api::client::interceptors::context;
let extractor = SQSExtractor::new();
let sdk_output = SendMessageOutput::builder()
.message_id("msg-abc-123")
.build();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "SendMessage", &output, &mut span);
assert_eq!(
span.get(semco::MESSAGING_MESSAGE_ID),
Some(&Value::from("msg-abc-123"))
);
let entry1 = SendMessageBatchResultEntry::builder()
.id("id-1")
.message_id("msg-1")
.md5_of_message_body("abc")
.build()
.unwrap();
let entry2 = SendMessageBatchResultEntry::builder()
.id("id-2")
.message_id("msg-2")
.md5_of_message_body("def")
.build()
.unwrap();
let sdk_output = SendMessageBatchOutput::builder()
.successful(entry1)
.successful(entry2)
.set_failed(Some(vec![]))
.build()
.unwrap();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "SendMessageBatch", &output, &mut span);
assert_eq!(
span.get(semco::MESSAGING_BATCH_MESSAGE_COUNT),
Some(&Value::I64(2))
);
let sdk_output = ReceiveMessageOutput::builder()
.messages(Message::builder().message_id("m1").build())
.messages(Message::builder().message_id("m2").build())
.messages(Message::builder().message_id("m3").build())
.build();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "ReceiveMessage", &output, &mut span);
assert_eq!(
span.get(semco::MESSAGING_BATCH_MESSAGE_COUNT),
Some(&Value::I64(3))
);
}
#[test]
fn extract_output_noop_and_edge_cases() {
use aws_sdk_sqs::operation::{
receive_message::ReceiveMessageOutput, send_message::SendMessageOutput,
send_message_batch::SendMessageBatchOutput,
};
use aws_smithy_runtime_api::client::interceptors::context;
let extractor = SQSExtractor::new();
let sdk_output = SendMessageOutput::builder().build();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "SendMessage", &output, &mut span);
assert!(span.get(semco::MESSAGING_MESSAGE_ID).is_none());
let sdk_output = SendMessageOutput::builder().build();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "UnknownOperation", &output, &mut span);
assert!(span.attributes.is_empty());
let sdk_output = SendMessageBatchOutput::builder()
.set_successful(Some(vec![]))
.set_failed(Some(vec![]))
.build()
.unwrap();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "SendMessageBatch", &output, &mut span);
assert_eq!(
span.get(semco::MESSAGING_BATCH_MESSAGE_COUNT),
Some(&Value::I64(0))
);
let sdk_output = ReceiveMessageOutput::builder().build();
let output = context::Output::erase(sdk_output);
let mut span = TestSpan::new();
extractor.extract_output("SQS", "ReceiveMessage", &output, &mut span);
assert_eq!(
span.get(semco::MESSAGING_BATCH_MESSAGE_COUNT),
Some(&Value::I64(0))
);
}
#[test]
fn set_queue_url_attrs_valid_url() {
let mut span = TestSpan::new();
set_queue_url_attrs(
&mut span,
Some("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"),
);
assert_eq!(
span.get(semco::AWS_SQS_QUEUE_URL),
Some(&Value::from(
"https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
))
);
assert_eq!(
span.get(semco::MESSAGING_DESTINATION_NAME),
Some(&Value::from("my-queue"))
);
}
#[test]
fn set_queue_url_attrs_none_and_trailing_slash() {
let mut span = TestSpan::new();
set_queue_url_attrs(&mut span, None);
assert!(span.attributes.is_empty());
let mut span = TestSpan::new();
set_queue_url_attrs(
&mut span,
Some("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue/"),
);
assert_eq!(
span.get(semco::AWS_SQS_QUEUE_URL),
Some(&Value::from(
"https://sqs.us-east-1.amazonaws.com/123456789012/my-queue/"
))
);
assert_eq!(span.get(semco::MESSAGING_DESTINATION_NAME), None);
}
}