pub const DB_SYSTEM_NAME: &str = "db.system.name";
pub const RPC_SYSTEM_NAME: &str = "rpc.system.name";
#[allow(unused)]
mod _tell_me_when_semconv_have_it {
use super::{DB_SYSTEM_NAME, RPC_SYSTEM_NAME};
use opentelemetry_semantic_conventions::attribute::*;
}
pub mod extract;
mod utils;
#[cfg(feature = "tracing-backend")]
pub mod tracing;
#[cfg(feature = "otel-backend")]
pub mod otel;
#[cfg(feature = "tracing-backend")]
pub type DefaultInterceptor = tracing::TracingInterceptor;
#[cfg(all(feature = "otel-backend", not(feature = "tracing-backend")))]
pub type DefaultInterceptor = otel::OtelInterceptor;
use aws_smithy_runtime_api::{box_error::BoxError, client::interceptors::context, http};
use aws_smithy_types::config_bag::ConfigBag;
use aws_types::{region::Region, request_id::RequestId};
use opentelemetry::trace::Status;
use opentelemetry_semantic_conventions::attribute as semco;
use utils::{AwsSdkOperation, extract_service_operation};
use crate::span_write::SpanWrite;
pub type Service<'a> = &'a str;
pub type Operation<'a> = &'a str;
pub enum ServiceFilter {
All,
Service(Service<'static>),
Operation(Service<'static>, Operation<'static>),
}
impl ServiceFilter {
fn is_match(&self, service: Service, operation: Operation) -> bool {
match self {
ServiceFilter::All => true,
ServiceFilter::Service(s) => s.eq_ignore_ascii_case(service),
ServiceFilter::Operation(s, o) => {
s.eq_ignore_ascii_case(service) && o.eq_ignore_ascii_case(operation)
}
}
}
}
pub trait AttributeExtractor<SW: SpanWrite> {
fn extract_input(
&self,
_service: Service,
_operation: Operation,
_input: &context::Input,
_span: &mut SW,
) {
}
fn extract_request(
&self,
_service: Service,
_operation: Operation,
_request: &http::Request,
_span: &mut SW,
) {
}
fn extract_response(
&self,
_service: Service,
_operation: Operation,
_response: &http::Response,
_span: &mut SW,
) {
}
fn extract_output(
&self,
_service: Service,
_operation: Operation,
_output: &context::Output,
_span: &mut SW,
) {
}
fn extract_error(
&self,
_service: Service,
_operation: Operation,
_error: &context::Error,
_span: &mut SW,
) {
}
}
type InputHook<SW> =
Box<dyn for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Input, &'a mut SW) + Send + Sync>;
type RequestHook<SW> =
Box<dyn for<'a> Fn(Service<'a>, Operation<'a>, &'a http::Request, &'a mut SW) + Send + Sync>;
type ResponseHook<SW> =
Box<dyn for<'a> Fn(Service<'a>, Operation<'a>, &'a http::Response, &'a mut SW) + Send + Sync>;
type OutputHook<SW> =
Box<dyn for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Output, &'a mut SW) + Send + Sync>;
type ErrorHook<SW> =
Box<dyn for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Error, &'a mut SW) + Send + Sync>;
pub struct DefaultExtractor<SW: SpanWrite> {
#[cfg(feature = "extract-dynamodb")]
dynamodb_extractor: extract::dynamodb::DynamoDBExtractor,
#[cfg(feature = "extract-s3")]
s3_extractor: extract::s3::S3Extractor,
#[cfg(feature = "extract-sqs")]
sqs_extractor: extract::sqs::SQSExtractor,
custom_extractors: Vec<Box<dyn AttributeExtractor<SW> + Send + Sync>>,
input_hooks: Vec<(ServiceFilter, InputHook<SW>)>,
request_hooks: Vec<(ServiceFilter, RequestHook<SW>)>,
response_hooks: Vec<(ServiceFilter, ResponseHook<SW>)>,
output_hooks: Vec<(ServiceFilter, OutputHook<SW>)>,
error_hooks: Vec<(ServiceFilter, ErrorHook<SW>)>,
}
impl<SW: SpanWrite> core::fmt::Debug for DefaultExtractor<SW> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DefaultExtractor").finish_non_exhaustive()
}
}
impl<SW: SpanWrite> DefaultExtractor<SW> {
fn new() -> Self {
Self {
#[cfg(feature = "extract-dynamodb")]
dynamodb_extractor: extract::dynamodb::DynamoDBExtractor::new(),
#[cfg(feature = "extract-s3")]
s3_extractor: extract::s3::S3Extractor::new(),
#[cfg(feature = "extract-sqs")]
sqs_extractor: extract::sqs::SQSExtractor::new(),
custom_extractors: Vec::new(),
input_hooks: Vec::new(),
request_hooks: Vec::new(),
response_hooks: Vec::new(),
output_hooks: Vec::new(),
error_hooks: Vec::new(),
}
}
pub fn register_input_hook<H>(&mut self, filter: ServiceFilter, hook: H)
where
H: for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Input, &'a mut SW),
H: Send + Sync + 'static,
{
self.input_hooks.push((filter, Box::new(hook)));
}
pub fn register_request_hook<H>(&mut self, filter: ServiceFilter, hook: H)
where
H: for<'a> Fn(Service<'a>, Operation<'a>, &'a http::Request, &'a mut SW),
H: Send + Sync + 'static,
{
self.request_hooks.push((filter, Box::new(hook)));
}
pub fn register_response_hook<H>(&mut self, filter: ServiceFilter, hook: H)
where
H: for<'a> Fn(Service<'a>, Operation<'a>, &'a http::Response, &'a mut SW),
H: Send + Sync + 'static,
{
self.response_hooks.push((filter, Box::new(hook)));
}
pub fn register_output_hook<H>(&mut self, filter: ServiceFilter, hook: H)
where
H: for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Output, &'a mut SW),
H: Send + Sync + 'static,
{
self.output_hooks.push((filter, Box::new(hook)));
}
pub fn register_error_hook<H>(&mut self, filter: ServiceFilter, hook: H)
where
H: for<'a> Fn(Service<'a>, Operation<'a>, &'a context::Error, &'a mut SW),
H: Send + Sync + 'static,
{
self.error_hooks.push((filter, Box::new(hook)));
}
pub fn register_attribute_extractor<AE>(&mut self, extractor: AE)
where
AE: AttributeExtractor<SW>,
AE: Send + Sync + 'static,
{
self.custom_extractors.push(Box::new(extractor));
}
}
macro_rules! call_extractors {
($self:ident $service:ident $operation:ident $method:ident $hooks:ident $parameter:ident $span:ident) => {
match $service {
#[cfg(feature = "extract-dynamodb")]
"DynamoDB" => $self
.dynamodb_extractor
.$method($service, $operation, $parameter, $span),
#[cfg(feature = "extract-s3")]
"S3" => $self
.s3_extractor
.$method($service, $operation, $parameter, $span),
#[cfg(feature = "extract-sqs")]
"SQS" => $self
.sqs_extractor
.$method($service, $operation, $parameter, $span),
_ => {}
}
for custom_extractors in $self.custom_extractors.iter() {
custom_extractors.$method($service, $operation, $parameter, $span);
}
for hook in $self
.$hooks
.iter()
.filter_map(|(filter, hook)| filter.is_match($service, $operation).then_some(hook))
{
hook.as_ref()($service, $operation, $parameter, $span);
}
};
}
impl<SW: SpanWrite> DefaultExtractor<SW> {
fn read_before_execution(
&self,
context: &context::BeforeSerializationInterceptorContextRef<'_>,
cfg: &mut ConfigBag,
span: &mut SW,
) -> Result<(), BoxError> {
log::trace!("CFG: {:?}", cfg);
span.set_attribute(
semco::CLOUD_REGION,
cfg.load::<Region>()
.ok_or("No Region in the ConfigBag")?
.to_string(),
);
let sdk_operation = {
let (_guard, span) = {
use utils::SpanPauser;
SpanPauser::pause_until(|span| {
span.metadata()
.map(|metadata| metadata.target().contains("::operation::"))
.unwrap_or_default()
})
.ok_or(
"AWS SDK operation top-level tracing:Span not found, \
it likely means AWS changed their API, \
please contact the maintainer immediately.",
)?
};
let span_name = span
.metadata()
.ok_or("tracing::Span metadata not enabled")?
.name();
let (service, operation) = span_name.split_once('.').ok_or_else(|| {
format!(
"AWS SDK operation top-level tracing:Span name does not have \
the expected form: {span_name}, it likely means AWS changed \
their API, please contact the maintainer immediately."
)
})?;
AwsSdkOperation::new(service, operation)
};
let service = sdk_operation.service();
let operation = sdk_operation.operation();
let input = context.input();
log::trace!("INPUT: {:?}", input);
call_extractors!(self service operation extract_input input_hooks input span);
cfg.interceptor_state().store_put(sdk_operation);
Ok(())
}
fn read_after_serialization(
&self,
context: &context::BeforeTransmitInterceptorContextRef<'_>,
cfg: &mut ConfigBag,
span: &mut SW,
) -> Result<(), BoxError> {
log::trace!("CFG: {:?}", cfg);
let (service, operation) = extract_service_operation(cfg);
let request = context.request();
log::trace!("REQUEST: {:?}", request);
call_extractors!(self service operation extract_request request_hooks request span);
Ok(())
}
fn read_before_deserialization(
&self,
context: &context::BeforeDeserializationInterceptorContextRef<'_>,
cfg: &mut ConfigBag,
span: &mut SW,
) -> Result<(), BoxError> {
log::trace!("CFG: {:?}", cfg);
let (service, operation) = extract_service_operation(cfg);
let response = context.response();
log::trace!("RESPONSE: {:?}", response);
span.set_attribute(
semco::HTTP_RESPONSE_STATUS_CODE,
response.status().as_u16() as i64,
);
if let Some(req_id) = RequestId::request_id(response) {
log::trace!("REQ_ID: {req_id}");
span.set_attribute(semco::AWS_REQUEST_ID, req_id.to_owned());
}
if let Some(extended_id) = response.headers().get("x-amz-id-2") {
log::trace!("EXTENDED_REQ_ID: {extended_id}");
span.set_attribute(semco::AWS_EXTENDED_REQUEST_ID, extended_id.to_owned());
}
call_extractors!(self service operation extract_response response_hooks response span);
Ok(())
}
fn read_after_execution(
&self,
context: &context::FinalizerInterceptorContextRef<'_>,
cfg: &mut ConfigBag,
span: &mut SW,
) -> Result<(), BoxError> {
log::trace!("CFG: {:?}", cfg);
let (service, operation) = extract_service_operation(cfg);
let output_or_error = context.output_or_error();
log::trace!("OUTPUT_OR_ERROR: {:?}", output_or_error);
match output_or_error {
Some(Ok(output)) => {
call_extractors!(self service operation extract_output output_hooks output span);
}
Some(Err(orchestration_error)) => {
if let Some(op_error) = orchestration_error.as_operation_error() {
let display = format!("{op_error}");
let (error_type, message) = match display.split_once(": ") {
Some((code, msg)) => (code, msg),
None => (display.as_str(), display.as_str()),
};
log::debug!("operation error: {display}");
span.set_attribute(semco::ERROR_TYPE, error_type.to_owned());
span.set_status(Status::error(message.to_owned()));
let error = op_error;
call_extractors!(self service operation extract_error error_hooks error span);
} else if let Some(connector_error) = orchestration_error.as_connector_error() {
let message = format!("{connector_error}");
log::debug!("connector error: {message}");
span.set_attribute(semco::ERROR_TYPE, "CONNECTOR".to_owned());
span.set_status(Status::error(message));
} else if orchestration_error.is_timeout_error() {
let message = format!("{orchestration_error}");
log::debug!("timeout error: {message}");
span.set_attribute(semco::ERROR_TYPE, "TIMEOUT".to_owned());
span.set_status(Status::error(message));
} else {
let message = format!("{orchestration_error}");
log::debug!("orchestration error: {message}");
span.set_attribute(semco::ERROR_TYPE, "_OTHER".to_owned());
span.set_status(Status::error(message));
}
}
None => {
log::debug!("no output or error received");
span.set_attribute(semco::ERROR_TYPE, "_OTHER".to_owned());
span.set_status(Status::error(
"SDK completed without output or error".to_owned(),
));
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn service_filter_is_match_matches() {
assert!(ServiceFilter::All.is_match("DynamoDB", "GetItem"));
assert!(ServiceFilter::All.is_match("S3", "PutObject"));
assert!(ServiceFilter::All.is_match("", ""));
assert!(ServiceFilter::Service("DynamoDB").is_match("DynamoDB", "GetItem"));
assert!(ServiceFilter::Service("DynamoDB").is_match("dynamodb", "GetItem"));
assert!(ServiceFilter::Service("dynamodb").is_match("DynamoDB", "PutItem"));
assert!(ServiceFilter::Service("DYNAMODB").is_match("dynamodb", "Query"));
assert!(ServiceFilter::Operation("DynamoDB", "GetItem").is_match("DynamoDB", "GetItem"));
assert!(ServiceFilter::Operation("DynamoDB", "GetItem").is_match("dynamodb", "getitem"));
assert!(ServiceFilter::Operation("s3", "putobject").is_match("S3", "PutObject"));
}
#[test]
fn service_filter_is_match_no_match() {
assert!(!ServiceFilter::Service("DynamoDB").is_match("S3", "GetItem"));
assert!(!ServiceFilter::Service("DynamoDB").is_match("SQS", "SendMessage"));
assert!(!ServiceFilter::Operation("DynamoDB", "GetItem").is_match("S3", "GetItem"));
assert!(!ServiceFilter::Operation("DynamoDB", "GetItem").is_match("DynamoDB", "PutItem"));
assert!(!ServiceFilter::Operation("DynamoDB", "GetItem").is_match("S3", "PutObject"));
}
}