telemetry_rust/middleware/aws/instrumentation/stream.rs
1use aws_smithy_async::future::pagination_stream::PaginationStream;
2use aws_smithy_types::error::metadata::ProvideErrorMetadata;
3use aws_smithy_types_convert::stream::{PaginationStreamExt, PaginationStreamImplStream};
4use aws_types::request_id::RequestId;
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7use std::{
8 cell::Cell,
9 error::Error,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use crate::{
15 KeyValue,
16 middleware::aws::{AwsSpan, AwsSpanBuilder},
17};
18
19/// A no-op implementation of [`RequestId`] for internal use.
20///
21/// This is used in place of an AWS response to satisfy [`AwsSpan::end`] trait bounds,
22/// because we don't have access to the real response with request ID information.
23struct Void;
24
25impl RequestId for Void {
26 fn request_id(&self) -> Option<&str> {
27 None
28 }
29}
30
31enum StreamStateKind {
32 Waiting,
33 Flowing,
34 Finished,
35}
36
37#[derive(Default)]
38enum StreamState<'a> {
39 Waiting(Box<AwsSpanBuilder<'a>>),
40 Flowing(AwsSpan),
41 Finished,
42 #[default]
43 Invalid,
44}
45
46impl<'a> StreamState<'a> {
47 fn new(span: impl Into<AwsSpanBuilder<'a>>) -> Self {
48 let span = Into::<AwsSpanBuilder>::into(span);
49 Self::Waiting(Box::new(
50 span.attribute(KeyValue::new("aws.pagination_stream", true)),
51 ))
52 }
53
54 fn kind(&self) -> StreamStateKind {
55 match self {
56 StreamState::Waiting(_) => StreamStateKind::Waiting,
57 StreamState::Flowing(_) => StreamStateKind::Flowing,
58 StreamState::Finished => StreamStateKind::Finished,
59 StreamState::Invalid => {
60 panic!("Invalid instrumented stream state")
61 }
62 }
63 }
64
65 fn start(self) -> Self {
66 let Self::Waiting(span) = self else {
67 panic!("Instrumented stream state is not Waiting");
68 };
69 Self::Flowing(span.start())
70 }
71
72 fn end<E: RequestId + ProvideErrorMetadata + Error>(
73 self,
74 aws_response: &Result<Void, E>,
75 ) -> Self {
76 let Self::Flowing(span) = self else {
77 panic!("Instrumented stream state is not Flowing");
78 };
79 span.end(aws_response);
80 Self::Finished
81 }
82}
83
84pin_project! {
85 /// A wrapper around a Stream that provides OpenTelemetry instrumentation for AWS operations.
86 ///
87 /// This struct automatically creates spans for stream operations and handles proper
88 /// span lifecycle management including error handling and completion tracking.
89 ///
90 /// The instrumented stream automatically adds the `aws.pagination_stream = true` attribute
91 /// to help identify pagination/streaming operations in traces.
92 ///
93 /// The instrumented stream maintains state to track the span lifecycle:
94 /// - `Waiting`: Initial state with a span builder ready to start
95 /// - `Flowing`: Active state with an ongoing span
96 /// - `Finished`: Terminal state after the stream completes or errors
97 pub struct InstrumentedStream<'a, S: Stream> {
98 #[pin]
99 inner: S,
100 state: Cell<StreamState<'a>>,
101 }
102}
103
104impl<T, E, S> Stream for InstrumentedStream<'_, S>
105where
106 E: RequestId + ProvideErrorMetadata + Error,
107 S: Stream<Item = Result<T, E>>,
108{
109 type Item = S::Item;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 let this = self.project();
113 match this.state.get_mut().kind() {
114 StreamStateKind::Waiting => {
115 this.state.set(this.state.take().start());
116 this.inner.poll_next(cx)
117 }
118 StreamStateKind::Flowing => match this.inner.poll_next(cx) {
119 Poll::Ready(None) => {
120 this.state.set(this.state.take().end(&Ok::<_, E>(Void)));
121 Poll::Ready(None)
122 }
123 Poll::Ready(Some(Err(err))) => {
124 let aws_result = Err(err);
125 this.state.set(this.state.take().end(&aws_result));
126 Poll::Ready(aws_result.err().map(Err))
127 }
128 result => result,
129 },
130 StreamStateKind::Finished => Poll::Ready(None),
131 }
132 }
133}
134
135/// A trait for adding OpenTelemetry instrumentation to AWS pagination streams.
136///
137/// This trait provides the `instrument` method that wraps streams with telemetry
138/// capabilities, automatically creating and managing spans for AWS operations.
139/// It is designed for AWS pagination streams, but it is implemented for any
140/// [`TryStream`][`futures_util::TryStream`] yielding AWS SDK errors.
141///
142/// The entire stream gets represented by a single [`AwsSpan`] regardless of the number
143/// of AWS SDK requests sent by the SDK to produce it.
144///
145/// All instrumented streams automatically include the `aws.pagination_stream = true`
146/// attribute to help identify streaming operations in traces.
147///
148/// # Integration with [`AwsBuilderInstrument`][`crate::middleware::aws::AwsBuilderInstrument`]
149///
150/// The recommended pattern is to use [`AwsBuilderInstrument::build_aws_span`][crate::middleware::aws::AwsBuilderInstrument::build_aws_span] to capture
151/// all the operation parameters before converting fluent builder into a paginator stream.
152///
153/// # Examples
154///
155/// ```rust
156/// use aws_sdk_dynamodb::{Client as DynamoClient, types::AttributeValue};
157/// use futures_util::TryStreamExt;
158/// use telemetry_rust::middleware::aws::{AwsBuilderInstrument, AwsStreamInstrument};
159///
160/// async fn query_table() -> Result<usize, Box<dyn std::error::Error>> {
161/// let config = aws_config::load_from_env().await;
162/// let dynamo_client = DynamoClient::new(&config);
163///
164/// // Build the query with all parameters
165/// let query = dynamo_client
166/// .query()
167/// .table_name("table_name")
168/// .index_name("my_index")
169/// .key_condition_expression("PK = :pk")
170/// .expression_attribute_values(":pk", AttributeValue::S("Test".to_string()));
171///
172/// // Extract span from fluent builder (includes all input attributes)
173/// let span = query.build_aws_span();
174///
175/// // Use the span to instrument the paginator stream.
176/// let items = query
177/// .into_paginator()
178/// .items()
179/// .send()
180/// .instrument(span)
181/// .try_collect::<Vec<_>>()
182/// .await?;
183///
184/// println!("DynamoDB items: {items:#?}");
185/// Ok(items.len())
186/// }
187/// ```
188pub trait AwsStreamInstrument<T, E, S>
189where
190 E: RequestId + ProvideErrorMetadata + Error,
191 S: Stream<Item = Result<T, E>>,
192{
193 /// Instruments the stream with OpenTelemetry tracing.
194 ///
195 /// This method wraps a [`TryStream`][`futures_util::TryStream`] in an [`InstrumentedStream`] that will:
196 /// - Start a span when the stream begins polling
197 /// - End the span with success when the stream completes normally
198 /// - End the span with error information if the stream encounters an error
199 ///
200 /// # Arguments
201 ///
202 /// * `span` - The span builder or span configuration to use for instrumentation
203 ///
204 /// # Returns
205 ///
206 /// An [`InstrumentedStream`] that wraps the original stream with telemetry capabilities.
207 fn instrument<'a>(
208 self,
209 span: impl Into<AwsSpanBuilder<'a>>,
210 ) -> InstrumentedStream<'a, S>;
211}
212
213impl<T, E, S> AwsStreamInstrument<T, E, S> for S
214where
215 E: RequestId + ProvideErrorMetadata + Error,
216 S: Stream<Item = Result<T, E>>,
217{
218 fn instrument<'a>(
219 self,
220 span: impl Into<AwsSpanBuilder<'a>>,
221 ) -> InstrumentedStream<'a, S> {
222 InstrumentedStream {
223 inner: self,
224 state: Cell::new(StreamState::new(span)),
225 }
226 }
227}
228
229impl<T, E> AwsStreamInstrument<T, E, PaginationStreamImplStream<Result<T, E>>>
230 for PaginationStream<Result<T, E>>
231where
232 E: RequestId + ProvideErrorMetadata + Error,
233{
234 fn instrument<'a>(
235 self,
236 span: impl Into<AwsSpanBuilder<'a>>,
237 ) -> InstrumentedStream<'a, PaginationStreamImplStream<Result<T, E>>> {
238 self.into_stream_03x().instrument(span)
239 }
240}