use super::{utils::*, *};
use crate::semconv;
impl<'a> AwsBuilderInstrument<'a> for PutRecordFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
let attributes = attributes![
self.get_record()
.as_ref()
.map(|record| record.data().as_ref().len())
.as_attribute(semconv::MESSAGING_MESSAGE_BODY_SIZE)
];
FirehoseSpanBuilder::put_record(stream_name).attributes(attributes)
}
}
impl InstrumentedFluentBuilderOutput for PutRecordOutput {
fn extract_attributes(&self) -> impl IntoIterator<Item = KeyValue> {
[KeyValue::new(
semconv::MESSAGING_MESSAGE_ID,
self.record_id().to_owned(),
)]
}
}
instrument_aws_operation!(aws_sdk_firehose::operation::put_record);
impl<'a> AwsBuilderInstrument<'a> for PutRecordBatchFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
let attributes = attributes![
self.get_records()
.as_ref()
.map(|records| records.len())
.as_attribute(semconv::MESSAGING_BATCH_MESSAGE_COUNT)
];
FirehoseSpanBuilder::put_record_batch(stream_name).attributes(attributes)
}
}
impl InstrumentedFluentBuilderOutput for PutRecordBatchOutput {
fn extract_attributes(&self) -> impl IntoIterator<Item = KeyValue> {
[KeyValue::new(
"messaging.batch.message_count.failed",
self.failed_put_count() as i64,
)]
}
}
instrument_aws_operation!(aws_sdk_firehose::operation::put_record_batch);
impl<'a> AwsBuilderInstrument<'a> for ListDeliveryStreamsFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
FirehoseSpanBuilder::list_delivery_streams()
}
}
impl InstrumentedFluentBuilderOutput for ListDeliveryStreamsOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::list_delivery_streams);
impl<'a> AwsBuilderInstrument<'a> for CreateDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::create_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for CreateDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::create_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for DeleteDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::delete_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for DeleteDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::delete_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for DescribeDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::describe_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for DescribeDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::describe_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for ListTagsForDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::list_tags_for_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for ListTagsForDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::list_tags_for_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for TagDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::tag_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for TagDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::tag_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for UntagDeliveryStreamFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::untag_delivery_stream(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for UntagDeliveryStreamOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::untag_delivery_stream);
impl<'a> AwsBuilderInstrument<'a> for StartDeliveryStreamEncryptionFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::start_delivery_stream_encryption(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for StartDeliveryStreamEncryptionOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::start_delivery_stream_encryption);
impl<'a> AwsBuilderInstrument<'a> for StopDeliveryStreamEncryptionFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::stop_delivery_stream_encryption(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for StopDeliveryStreamEncryptionOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::stop_delivery_stream_encryption);
impl<'a> AwsBuilderInstrument<'a> for UpdateDestinationFluentBuilder {
fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
FirehoseSpanBuilder::update_destination(stream_name)
}
}
impl InstrumentedFluentBuilderOutput for UpdateDestinationOutput {}
instrument_aws_operation!(aws_sdk_firehose::operation::update_destination);