telemetry_rust/middleware/aws/instrumentation/stream.rs
1use aws_smithy_async::future::pagination_stream::PaginationStream;
2use aws_smithy_types_convert::stream::{PaginationStreamExt, PaginationStreamImplStream};
3use aws_types::request_id::RequestId;
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use std::{
7 cell::Cell,
8 error::Error,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13use crate::{
14 KeyValue,
15 middleware::aws::{AwsSpan, AwsSpanBuilder},
16};
17
18/// A no-op implementation of [`RequestId`] for internal use.
19///
20/// This is used in place of an AWS response to satisfy [`AwsSpan::end`] trait bounds,
21/// because we don't have access to the real response with request ID information.
22struct Void;
23
24impl RequestId for Void {
25 fn request_id(&self) -> Option<&str> {
26 None
27 }
28}
29
30enum StreamStateKind {
31 Waiting,
32 Flowing,
33 Finished,
34}
35
36#[derive(Default)]
37enum StreamState<'a> {
38 Waiting(Box<AwsSpanBuilder<'a>>),
39 Flowing(AwsSpan),
40 Finished,
41 #[default]
42 Invalid,
43}
44
45impl<'a> StreamState<'a> {
46 fn new(span: impl Into<AwsSpanBuilder<'a>>) -> Self {
47 let span = Into::<AwsSpanBuilder>::into(span);
48 Self::Waiting(Box::new(
49 span.attribute(KeyValue::new("aws.pagination_stream", true)),
50 ))
51 }
52
53 fn kind(&self) -> StreamStateKind {
54 match self {
55 StreamState::Waiting(_) => StreamStateKind::Waiting,
56 StreamState::Flowing(_) => StreamStateKind::Flowing,
57 StreamState::Finished => StreamStateKind::Finished,
58 StreamState::Invalid => {
59 panic!("Invalid instrumented stream state")
60 }
61 }
62 }
63
64 fn start(self) -> Self {
65 let Self::Waiting(span) = self else {
66 panic!("Instrumented stream state is not Waiting");
67 };
68 Self::Flowing(span.start())
69 }
70
71 fn end<E: RequestId + Error>(self, aws_response: &Result<Void, E>) -> Self {
72 let Self::Flowing(span) = self else {
73 panic!("Instrumented stream state is not Flowing");
74 };
75 span.end(aws_response);
76 Self::Finished
77 }
78}
79
80pin_project! {
81 /// A wrapper around a Stream that provides OpenTelemetry instrumentation for AWS operations.
82 ///
83 /// This struct automatically creates spans for stream operations and handles proper
84 /// span lifecycle management including error handling and completion tracking.
85 ///
86 /// The instrumented stream automatically adds the `aws.pagination_stream = true` attribute
87 /// to help identify pagination/streaming operations in traces.
88 ///
89 /// The instrumented stream maintains state to track the span lifecycle:
90 /// - `Waiting`: Initial state with a span builder ready to start
91 /// - `Flowing`: Active state with an ongoing span
92 /// - `Finished`: Terminal state after the stream completes or errors
93 pub struct InstrumentedStream<'a, S: Stream> {
94 #[pin]
95 inner: S,
96 state: Cell<StreamState<'a>>,
97 }
98}
99
100impl<T, E, S> Stream for InstrumentedStream<'_, S>
101where
102 E: RequestId + Error,
103 S: Stream<Item = Result<T, E>>,
104{
105 type Item = S::Item;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 let this = self.project();
109 match this.state.get_mut().kind() {
110 StreamStateKind::Waiting => {
111 this.state.set(this.state.take().start());
112 this.inner.poll_next(cx)
113 }
114 StreamStateKind::Flowing => match this.inner.poll_next(cx) {
115 Poll::Ready(None) => {
116 this.state.set(this.state.take().end(&Ok::<_, E>(Void)));
117 Poll::Ready(None)
118 }
119 Poll::Ready(Some(Err(err))) => {
120 let aws_result = Err(err);
121 this.state.set(this.state.take().end(&aws_result));
122 Poll::Ready(aws_result.err().map(Err))
123 }
124 result => result,
125 },
126 StreamStateKind::Finished => Poll::Ready(None),
127 }
128 }
129}
130
131/// A trait for adding OpenTelemetry instrumentation to AWS pagination streams.
132///
133/// This trait provides the `instrument` method that wraps streams with telemetry
134/// capabilities, automatically creating and managing spans for AWS operations.
135/// It is designed for AWS pagination streams, but it is implemented for any
136/// [`TryStream`][`futures_util::TryStream`] yielding AWS SDK errors.
137///
138/// All instrumented streams automatically include the `aws.pagination_stream = true`
139/// attribute to help identify streaming operations in traces.
140///
141/// # Examples
142///
143/// ```rust
144/// use aws_sdk_dynamodb::{Client as DynamoClient, types::AttributeValue};
145/// use futures_util::TryStreamExt;
146/// use telemetry_rust::{
147/// KeyValue,
148/// middleware::aws::{AwsStreamInstrument, DynamodbSpanBuilder},
149/// semconv,
150/// };
151///
152/// async fn query_table() -> Result<usize, Box<dyn std::error::Error>> {
153/// let config = aws_config::load_from_env().await;
154/// let dynamo_client = DynamoClient::new(&config);
155/// let items =
156/// dynamo_client
157/// .query()
158/// .table_name("table_name")
159/// .index_name("my_index")
160/// .key_condition_expression("PK = :pk")
161/// .expression_attribute_values(":pk", AttributeValue::S("Test".to_string()))
162/// .into_paginator()
163/// .items()
164/// .send()
165/// .instrument(DynamodbSpanBuilder::query("table_name").attribute(
166/// KeyValue::new(semconv::AWS_DYNAMODB_INDEX_NAME, "my_index"),
167/// ))
168/// .try_collect::<Vec<_>>()
169/// .await?;
170/// println!("DynamoDB items: {items:#?}");
171/// Ok(items.len())
172/// }
173/// ```
174pub trait AwsStreamInstrument<T, E, S>
175where
176 E: RequestId + Error,
177 S: Stream<Item = Result<T, E>>,
178{
179 /// Instruments the stream with OpenTelemetry tracing.
180 ///
181 /// This method wraps a [`TryStream`][`futures_util::TryStream`] in an [`InstrumentedStream`] that will:
182 /// - Start a span when the stream begins polling
183 /// - End the span with success when the stream completes normally
184 /// - End the span with error information if the stream encounters an error
185 ///
186 /// # Arguments
187 ///
188 /// * `span` - The span builder or span configuration to use for instrumentation
189 ///
190 /// # Returns
191 ///
192 /// An [`InstrumentedStream`] that wraps the original stream with telemetry capabilities.
193 fn instrument<'a>(
194 self,
195 span: impl Into<AwsSpanBuilder<'a>>,
196 ) -> InstrumentedStream<'a, S>;
197}
198
199impl<T, E, S> AwsStreamInstrument<T, E, S> for S
200where
201 E: RequestId + Error,
202 S: Stream<Item = Result<T, E>>,
203{
204 fn instrument<'a>(
205 self,
206 span: impl Into<AwsSpanBuilder<'a>>,
207 ) -> InstrumentedStream<'a, S> {
208 InstrumentedStream {
209 inner: self,
210 state: Cell::new(StreamState::new(span)),
211 }
212 }
213}
214
215impl<T, E> AwsStreamInstrument<T, E, PaginationStreamImplStream<Result<T, E>>>
216 for PaginationStream<Result<T, E>>
217where
218 E: RequestId + Error,
219{
220 fn instrument<'a>(
221 self,
222 span: impl Into<AwsSpanBuilder<'a>>,
223 ) -> InstrumentedStream<'a, PaginationStreamImplStream<Result<T, E>>> {
224 self.into_stream_03x().instrument(span)
225 }
226}