telemetry_rust/middleware/aws/instrumentation/fluent_builder/
firehose.rs1use super::{utils::*, *};
4use crate::semconv;
5
6impl<'a> AwsBuilderInstrument<'a> for PutRecordFluentBuilder {
8 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
9 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
10 let attributes = attributes![
11 self.get_record()
12 .as_ref()
13 .map(|record| record.data().as_ref().len())
14 .as_attribute(semconv::MESSAGING_MESSAGE_BODY_SIZE)
15 ];
16 FirehoseSpanBuilder::put_record(stream_name).attributes(attributes)
17 }
18}
19impl InstrumentedFluentBuilderOutput for PutRecordOutput {
20 fn extract_attributes(&self) -> impl IntoIterator<Item = KeyValue> {
21 [KeyValue::new(
22 semconv::MESSAGING_MESSAGE_ID,
23 self.record_id().to_owned(),
24 )]
25 }
26}
27instrument_aws_operation!(aws_sdk_firehose::operation::put_record);
28
29impl<'a> AwsBuilderInstrument<'a> for PutRecordBatchFluentBuilder {
30 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
31 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
32 let attributes = attributes![
33 self.get_records()
34 .as_ref()
35 .map(|records| records.len())
36 .as_attribute(semconv::MESSAGING_BATCH_MESSAGE_COUNT)
37 ];
38 FirehoseSpanBuilder::put_record_batch(stream_name).attributes(attributes)
39 }
40}
41impl InstrumentedFluentBuilderOutput for PutRecordBatchOutput {
42 fn extract_attributes(&self) -> impl IntoIterator<Item = KeyValue> {
43 [KeyValue::new(
44 "messaging.batch.message_count.failed",
45 self.failed_put_count() as i64,
46 )]
47 }
48}
49instrument_aws_operation!(aws_sdk_firehose::operation::put_record_batch);
50
51impl<'a> AwsBuilderInstrument<'a> for ListDeliveryStreamsFluentBuilder {
53 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
54 FirehoseSpanBuilder::list_delivery_streams()
55 }
56}
57impl InstrumentedFluentBuilderOutput for ListDeliveryStreamsOutput {}
58instrument_aws_operation!(aws_sdk_firehose::operation::list_delivery_streams);
59
60impl<'a> AwsBuilderInstrument<'a> for CreateDeliveryStreamFluentBuilder {
62 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
63 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
64 FirehoseSpanBuilder::create_delivery_stream(stream_name)
65 }
66}
67impl InstrumentedFluentBuilderOutput for CreateDeliveryStreamOutput {}
68instrument_aws_operation!(aws_sdk_firehose::operation::create_delivery_stream);
69
70impl<'a> AwsBuilderInstrument<'a> for DeleteDeliveryStreamFluentBuilder {
71 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
72 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
73 FirehoseSpanBuilder::delete_delivery_stream(stream_name)
74 }
75}
76impl InstrumentedFluentBuilderOutput for DeleteDeliveryStreamOutput {}
77instrument_aws_operation!(aws_sdk_firehose::operation::delete_delivery_stream);
78
79impl<'a> AwsBuilderInstrument<'a> for DescribeDeliveryStreamFluentBuilder {
80 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
81 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
82 FirehoseSpanBuilder::describe_delivery_stream(stream_name)
83 }
84}
85impl InstrumentedFluentBuilderOutput for DescribeDeliveryStreamOutput {}
86instrument_aws_operation!(aws_sdk_firehose::operation::describe_delivery_stream);
87
88impl<'a> AwsBuilderInstrument<'a> for ListTagsForDeliveryStreamFluentBuilder {
90 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
91 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
92 FirehoseSpanBuilder::list_tags_for_delivery_stream(stream_name)
93 }
94}
95impl InstrumentedFluentBuilderOutput for ListTagsForDeliveryStreamOutput {}
96instrument_aws_operation!(aws_sdk_firehose::operation::list_tags_for_delivery_stream);
97
98impl<'a> AwsBuilderInstrument<'a> for TagDeliveryStreamFluentBuilder {
99 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
100 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
101 FirehoseSpanBuilder::tag_delivery_stream(stream_name)
102 }
103}
104impl InstrumentedFluentBuilderOutput for TagDeliveryStreamOutput {}
105instrument_aws_operation!(aws_sdk_firehose::operation::tag_delivery_stream);
106
107impl<'a> AwsBuilderInstrument<'a> for UntagDeliveryStreamFluentBuilder {
108 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
109 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
110 FirehoseSpanBuilder::untag_delivery_stream(stream_name)
111 }
112}
113impl InstrumentedFluentBuilderOutput for UntagDeliveryStreamOutput {}
114instrument_aws_operation!(aws_sdk_firehose::operation::untag_delivery_stream);
115
116impl<'a> AwsBuilderInstrument<'a> for StartDeliveryStreamEncryptionFluentBuilder {
118 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
119 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
120 FirehoseSpanBuilder::start_delivery_stream_encryption(stream_name)
121 }
122}
123impl InstrumentedFluentBuilderOutput for StartDeliveryStreamEncryptionOutput {}
124instrument_aws_operation!(aws_sdk_firehose::operation::start_delivery_stream_encryption);
125
126impl<'a> AwsBuilderInstrument<'a> for StopDeliveryStreamEncryptionFluentBuilder {
127 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
128 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
129 FirehoseSpanBuilder::stop_delivery_stream_encryption(stream_name)
130 }
131}
132impl InstrumentedFluentBuilderOutput for StopDeliveryStreamEncryptionOutput {}
133instrument_aws_operation!(aws_sdk_firehose::operation::stop_delivery_stream_encryption);
134
135impl<'a> AwsBuilderInstrument<'a> for UpdateDestinationFluentBuilder {
137 fn build_aws_span(&self) -> AwsSpanBuilder<'a> {
138 let stream_name = self.get_delivery_stream_name().clone().unwrap_or_default();
139 FirehoseSpanBuilder::update_destination(stream_name)
140 }
141}
142impl InstrumentedFluentBuilderOutput for UpdateDestinationOutput {}
143instrument_aws_operation!(aws_sdk_firehose::operation::update_destination);