telemetry_rust/middleware/aws/instrumentation/stream.rs
1use aws_smithy_async::future::pagination_stream::PaginationStream;
2use aws_smithy_types_convert::stream::{PaginationStreamExt, PaginationStreamImplStream};
3use aws_types::request_id::RequestId;
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use std::{
7 cell::Cell,
8 error::Error,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13use crate::{
14 KeyValue,
15 middleware::aws::{AwsSpan, AwsSpanBuilder},
16};
17
18/// A no-op implementation of [`RequestId`] for internal use.
19///
20/// This is used in place of an AWS response to satisfy [`AwsSpan::end`] trait bounds,
21/// because we don't have access to the real response with request ID information.
22struct Void;
23
24impl RequestId for Void {
25 fn request_id(&self) -> Option<&str> {
26 None
27 }
28}
29
30enum StreamStateKind {
31 Waiting,
32 Flowing,
33 Finished,
34}
35
36#[derive(Default)]
37enum StreamState<'a> {
38 Waiting(Box<AwsSpanBuilder<'a>>),
39 Flowing(AwsSpan),
40 Finished,
41 #[default]
42 Invalid,
43}
44
45impl<'a> StreamState<'a> {
46 fn new(span: impl Into<AwsSpanBuilder<'a>>) -> Self {
47 let span = Into::<AwsSpanBuilder>::into(span);
48 Self::Waiting(Box::new(
49 span.attribute(KeyValue::new("aws.pagination_stream", true)),
50 ))
51 }
52
53 fn kind(&self) -> StreamStateKind {
54 match self {
55 StreamState::Waiting(_) => StreamStateKind::Waiting,
56 StreamState::Flowing(_) => StreamStateKind::Flowing,
57 StreamState::Finished => StreamStateKind::Finished,
58 StreamState::Invalid => {
59 panic!("Invalid instrumented stream state")
60 }
61 }
62 }
63
64 fn start(self) -> Self {
65 let Self::Waiting(span) = self else {
66 panic!("Instrumented stream state is not Waiting");
67 };
68 Self::Flowing(span.start())
69 }
70
71 fn end<E: RequestId + Error>(self, aws_response: &Result<Void, E>) -> Self {
72 let Self::Flowing(span) = self else {
73 panic!("Instrumented stream state is not Flowing");
74 };
75 span.end(aws_response);
76 Self::Finished
77 }
78}
79
80pin_project! {
81 /// A wrapper around a Stream that provides OpenTelemetry instrumentation for AWS operations.
82 ///
83 /// This struct automatically creates spans for stream operations and handles proper
84 /// span lifecycle management including error handling and completion tracking.
85 ///
86 /// The instrumented stream automatically adds the `aws.pagination_stream = true` attribute
87 /// to help identify pagination/streaming operations in traces.
88 ///
89 /// The instrumented stream maintains state to track the span lifecycle:
90 /// - `Waiting`: Initial state with a span builder ready to start
91 /// - `Flowing`: Active state with an ongoing span
92 /// - `Finished`: Terminal state after the stream completes or errors
93 pub struct InstrumentedStream<'a, S: Stream> {
94 #[pin]
95 inner: S,
96 state: Cell<StreamState<'a>>,
97 }
98}
99
100impl<T, E, S> Stream for InstrumentedStream<'_, S>
101where
102 E: RequestId + Error,
103 S: Stream<Item = Result<T, E>>,
104{
105 type Item = S::Item;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 let this = self.project();
109 match this.state.get_mut().kind() {
110 StreamStateKind::Waiting => {
111 this.state.set(this.state.take().start());
112 this.inner.poll_next(cx)
113 }
114 StreamStateKind::Flowing => match this.inner.poll_next(cx) {
115 Poll::Ready(None) => {
116 this.state.set(this.state.take().end(&Ok::<_, E>(Void)));
117 Poll::Ready(None)
118 }
119 Poll::Ready(Some(Err(err))) => {
120 let aws_result = Err(err);
121 this.state.set(this.state.take().end(&aws_result));
122 Poll::Ready(aws_result.err().map(Err))
123 }
124 result => result,
125 },
126 StreamStateKind::Finished => Poll::Ready(None),
127 }
128 }
129}
130
131/// A trait for adding OpenTelemetry instrumentation to AWS pagination streams.
132///
133/// This trait provides the `instrument` method that wraps streams with telemetry
134/// capabilities, automatically creating and managing spans for AWS operations.
135/// It is designed for AWS pagination streams, but it is implemented for any
136/// [`TryStream`][`futures_util::TryStream`] yielding AWS SDK errors.
137///
138/// The entire stream gets represented by a single [`AwsSpan`] regardless of the number
139/// of AWS SDK requests sent by the SDK to produce it.
140///
141/// All instrumented streams automatically include the `aws.pagination_stream = true`
142/// attribute to help identify streaming operations in traces.
143///
144/// # Integration with [`AwsBuilderInstrument`][`crate::middleware::aws::AwsBuilderInstrument`]
145///
146/// The recommended pattern is to use [`AwsBuilderInstrument::build_aws_span`][crate::middleware::aws::AwsBuilderInstrument::build_aws_span] to capture
147/// all the operation parameters before converting fluent builder into a paginator stream.
148///
149/// # Examples
150///
151/// ```rust
152/// use aws_sdk_dynamodb::{Client as DynamoClient, types::AttributeValue};
153/// use futures_util::TryStreamExt;
154/// use telemetry_rust::middleware::aws::{AwsBuilderInstrument, AwsStreamInstrument};
155///
156/// async fn query_table() -> Result<usize, Box<dyn std::error::Error>> {
157/// let config = aws_config::load_from_env().await;
158/// let dynamo_client = DynamoClient::new(&config);
159///
160/// // Build the query with all parameters
161/// let query = dynamo_client
162/// .query()
163/// .table_name("table_name")
164/// .index_name("my_index")
165/// .key_condition_expression("PK = :pk")
166/// .expression_attribute_values(":pk", AttributeValue::S("Test".to_string()));
167///
168/// // Extract span from fluent builder (includes all input attributes)
169/// let span = query.build_aws_span();
170///
171/// // Use the span to instrument the paginator stream.
172/// let items = query
173/// .into_paginator()
174/// .items()
175/// .send()
176/// .instrument(span)
177/// .try_collect::<Vec<_>>()
178/// .await?;
179///
180/// println!("DynamoDB items: {items:#?}");
181/// Ok(items.len())
182/// }
183/// ```
184pub trait AwsStreamInstrument<T, E, S>
185where
186 E: RequestId + Error,
187 S: Stream<Item = Result<T, E>>,
188{
189 /// Instruments the stream with OpenTelemetry tracing.
190 ///
191 /// This method wraps a [`TryStream`][`futures_util::TryStream`] in an [`InstrumentedStream`] that will:
192 /// - Start a span when the stream begins polling
193 /// - End the span with success when the stream completes normally
194 /// - End the span with error information if the stream encounters an error
195 ///
196 /// # Arguments
197 ///
198 /// * `span` - The span builder or span configuration to use for instrumentation
199 ///
200 /// # Returns
201 ///
202 /// An [`InstrumentedStream`] that wraps the original stream with telemetry capabilities.
203 fn instrument<'a>(
204 self,
205 span: impl Into<AwsSpanBuilder<'a>>,
206 ) -> InstrumentedStream<'a, S>;
207}
208
209impl<T, E, S> AwsStreamInstrument<T, E, S> for S
210where
211 E: RequestId + Error,
212 S: Stream<Item = Result<T, E>>,
213{
214 fn instrument<'a>(
215 self,
216 span: impl Into<AwsSpanBuilder<'a>>,
217 ) -> InstrumentedStream<'a, S> {
218 InstrumentedStream {
219 inner: self,
220 state: Cell::new(StreamState::new(span)),
221 }
222 }
223}
224
225impl<T, E> AwsStreamInstrument<T, E, PaginationStreamImplStream<Result<T, E>>>
226 for PaginationStream<Result<T, E>>
227where
228 E: RequestId + Error,
229{
230 fn instrument<'a>(
231 self,
232 span: impl Into<AwsSpanBuilder<'a>>,
233 ) -> InstrumentedStream<'a, PaginationStreamImplStream<Result<T, E>>> {
234 self.into_stream_03x().instrument(span)
235 }
236}