Skip to main content

telemetry_rust/middleware/aws/instrumentation/
stream.rs

1use aws_smithy_async::future::pagination_stream::PaginationStream;
2use aws_smithy_types::error::metadata::ProvideErrorMetadata;
3use aws_smithy_types_convert::stream::{PaginationStreamExt, PaginationStreamImplStream};
4use aws_types::request_id::RequestId;
5use futures_util::Stream;
6use pin_project_lite::pin_project;
7use std::{
8    cell::Cell,
9    error::Error,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use crate::{
15    KeyValue,
16    middleware::aws::{AwsSpan, AwsSpanBuilder},
17};
18
19/// A no-op implementation of [`RequestId`] for internal use.
20///
21/// This is used in place of an AWS response to satisfy [`AwsSpan::end`] trait bounds,
22/// because we don't have access to the real response with request ID information.
23struct Void;
24
25impl RequestId for Void {
26    fn request_id(&self) -> Option<&str> {
27        None
28    }
29}
30
31enum StreamStateKind {
32    Waiting,
33    Flowing,
34    Finished,
35}
36
37#[derive(Default)]
38enum StreamState<'a> {
39    Waiting(Box<AwsSpanBuilder<'a>>),
40    Flowing(AwsSpan),
41    Finished,
42    #[default]
43    Invalid,
44}
45
46impl<'a> StreamState<'a> {
47    fn new(span: impl Into<AwsSpanBuilder<'a>>) -> Self {
48        let span = Into::<AwsSpanBuilder>::into(span);
49        Self::Waiting(Box::new(
50            span.attribute(KeyValue::new("aws.pagination_stream", true)),
51        ))
52    }
53
54    fn kind(&self) -> StreamStateKind {
55        match self {
56            StreamState::Waiting(_) => StreamStateKind::Waiting,
57            StreamState::Flowing(_) => StreamStateKind::Flowing,
58            StreamState::Finished => StreamStateKind::Finished,
59            StreamState::Invalid => {
60                panic!("Invalid instrumented stream state")
61            }
62        }
63    }
64
65    fn start(self) -> Self {
66        let Self::Waiting(span) = self else {
67            panic!("Instrumented stream state is not Waiting");
68        };
69        Self::Flowing(span.start())
70    }
71
72    fn end<E: RequestId + ProvideErrorMetadata + Error>(
73        self,
74        aws_response: &Result<Void, E>,
75    ) -> Self {
76        let Self::Flowing(span) = self else {
77            panic!("Instrumented stream state is not Flowing");
78        };
79        span.end(aws_response);
80        Self::Finished
81    }
82}
83
84pin_project! {
85    /// A wrapper around a Stream that provides OpenTelemetry instrumentation for AWS operations.
86    ///
87    /// This struct automatically creates spans for stream operations and handles proper
88    /// span lifecycle management including error handling and completion tracking.
89    ///
90    /// The instrumented stream automatically adds the `aws.pagination_stream = true` attribute
91    /// to help identify pagination/streaming operations in traces.
92    ///
93    /// The instrumented stream maintains state to track the span lifecycle:
94    /// - `Waiting`: Initial state with a span builder ready to start
95    /// - `Flowing`: Active state with an ongoing span
96    /// - `Finished`: Terminal state after the stream completes or errors
97    pub struct InstrumentedStream<'a, S: Stream> {
98        #[pin]
99        inner: S,
100        state: Cell<StreamState<'a>>,
101    }
102}
103
104impl<T, E, S> Stream for InstrumentedStream<'_, S>
105where
106    E: RequestId + ProvideErrorMetadata + Error,
107    S: Stream<Item = Result<T, E>>,
108{
109    type Item = S::Item;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        let this = self.project();
113        match this.state.get_mut().kind() {
114            StreamStateKind::Waiting => {
115                this.state.set(this.state.take().start());
116                this.inner.poll_next(cx)
117            }
118            StreamStateKind::Flowing => match this.inner.poll_next(cx) {
119                Poll::Ready(None) => {
120                    this.state.set(this.state.take().end(&Ok::<_, E>(Void)));
121                    Poll::Ready(None)
122                }
123                Poll::Ready(Some(Err(err))) => {
124                    let aws_result = Err(err);
125                    this.state.set(this.state.take().end(&aws_result));
126                    Poll::Ready(aws_result.err().map(Err))
127                }
128                result => result,
129            },
130            StreamStateKind::Finished => Poll::Ready(None),
131        }
132    }
133}
134
135/// A trait for adding OpenTelemetry instrumentation to AWS pagination streams.
136///
137/// This trait provides the `instrument` method that wraps streams with telemetry
138/// capabilities, automatically creating and managing spans for AWS operations.
139/// It is designed for AWS pagination streams, but it is implemented for any
140/// [`TryStream`][`futures_util::TryStream`] yielding AWS SDK errors.
141///
142/// The entire stream gets represented by a single [`AwsSpan`] regardless of the number
143/// of AWS SDK requests sent by the SDK to produce it.
144///
145/// All instrumented streams automatically include the `aws.pagination_stream = true`
146/// attribute to help identify streaming operations in traces.
147///
148/// # Integration with [`AwsBuilderInstrument`][`crate::middleware::aws::AwsBuilderInstrument`]
149///
150/// The recommended pattern is to use [`AwsBuilderInstrument::build_aws_span`][crate::middleware::aws::AwsBuilderInstrument::build_aws_span] to capture
151/// all the operation parameters before converting fluent builder into a paginator stream.
152///
153/// # Examples
154///
155/// ```rust
156/// use aws_sdk_dynamodb::{Client as DynamoClient, types::AttributeValue};
157/// use futures_util::TryStreamExt;
158/// use telemetry_rust::middleware::aws::{AwsBuilderInstrument, AwsStreamInstrument};
159///
160/// async fn query_table() -> Result<usize, Box<dyn std::error::Error>> {
161///     let config = aws_config::load_from_env().await;
162///     let dynamo_client = DynamoClient::new(&config);
163///
164///     // Build the query with all parameters
165///     let query = dynamo_client
166///         .query()
167///         .table_name("table_name")
168///         .index_name("my_index")
169///         .key_condition_expression("PK = :pk")
170///         .expression_attribute_values(":pk", AttributeValue::S("Test".to_string()));
171///
172///     // Extract span from fluent builder (includes all input attributes)
173///     let span = query.build_aws_span();
174///
175///     // Use the span to instrument the paginator stream.
176///     let items = query
177///         .into_paginator()
178///         .items()
179///         .send()
180///         .instrument(span)
181///         .try_collect::<Vec<_>>()
182///         .await?;
183///
184///     println!("DynamoDB items: {items:#?}");
185///     Ok(items.len())
186/// }
187/// ```
188pub trait AwsStreamInstrument<T, E, S>
189where
190    E: RequestId + ProvideErrorMetadata + Error,
191    S: Stream<Item = Result<T, E>>,
192{
193    /// Instruments the stream with OpenTelemetry tracing.
194    ///
195    /// This method wraps a [`TryStream`][`futures_util::TryStream`] in an [`InstrumentedStream`] that will:
196    /// - Start a span when the stream begins polling
197    /// - End the span with success when the stream completes normally
198    /// - End the span with error information if the stream encounters an error
199    ///
200    /// # Arguments
201    ///
202    /// * `span` - The span builder or span configuration to use for instrumentation
203    ///
204    /// # Returns
205    ///
206    /// An [`InstrumentedStream`] that wraps the original stream with telemetry capabilities.
207    fn instrument<'a>(
208        self,
209        span: impl Into<AwsSpanBuilder<'a>>,
210    ) -> InstrumentedStream<'a, S>;
211}
212
213impl<T, E, S> AwsStreamInstrument<T, E, S> for S
214where
215    E: RequestId + ProvideErrorMetadata + Error,
216    S: Stream<Item = Result<T, E>>,
217{
218    fn instrument<'a>(
219        self,
220        span: impl Into<AwsSpanBuilder<'a>>,
221    ) -> InstrumentedStream<'a, S> {
222        InstrumentedStream {
223            inner: self,
224            state: Cell::new(StreamState::new(span)),
225        }
226    }
227}
228
229impl<T, E> AwsStreamInstrument<T, E, PaginationStreamImplStream<Result<T, E>>>
230    for PaginationStream<Result<T, E>>
231where
232    E: RequestId + ProvideErrorMetadata + Error,
233{
234    fn instrument<'a>(
235        self,
236        span: impl Into<AwsSpanBuilder<'a>>,
237    ) -> InstrumentedStream<'a, PaginationStreamImplStream<Result<T, E>>> {
238        self.into_stream_03x().instrument(span)
239    }
240}