telemetry_rust/middleware/aws/instrumentation/
stream.rs

1use aws_smithy_async::future::pagination_stream::PaginationStream;
2use aws_smithy_types_convert::stream::{PaginationStreamExt, PaginationStreamImplStream};
3use aws_types::request_id::RequestId;
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use std::{
7    cell::Cell,
8    error::Error,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13use crate::{
14    KeyValue,
15    middleware::aws::{AwsSpan, AwsSpanBuilder},
16};
17
18/// A no-op implementation of [`RequestId`] for internal use.
19///
20/// This is used in place of an AWS response to satisfy [`AwsSpan::end`] trait bounds,
21/// because we don't have access to the real response with request ID information.
22struct Void;
23
24impl RequestId for Void {
25    fn request_id(&self) -> Option<&str> {
26        None
27    }
28}
29
30enum StreamStateKind {
31    Waiting,
32    Flowing,
33    Finished,
34}
35
36#[derive(Default)]
37enum StreamState<'a> {
38    Waiting(Box<AwsSpanBuilder<'a>>),
39    Flowing(AwsSpan),
40    Finished,
41    #[default]
42    Invalid,
43}
44
45impl<'a> StreamState<'a> {
46    fn new(span: impl Into<AwsSpanBuilder<'a>>) -> Self {
47        let span = Into::<AwsSpanBuilder>::into(span);
48        Self::Waiting(Box::new(
49            span.attribute(KeyValue::new("aws.pagination_stream", true)),
50        ))
51    }
52
53    fn kind(&self) -> StreamStateKind {
54        match self {
55            StreamState::Waiting(_) => StreamStateKind::Waiting,
56            StreamState::Flowing(_) => StreamStateKind::Flowing,
57            StreamState::Finished => StreamStateKind::Finished,
58            StreamState::Invalid => {
59                panic!("Invalid instrumented stream state")
60            }
61        }
62    }
63
64    fn start(self) -> Self {
65        let Self::Waiting(span) = self else {
66            panic!("Instrumented stream state is not Waiting");
67        };
68        Self::Flowing(span.start())
69    }
70
71    fn end<E: RequestId + Error>(self, aws_response: &Result<Void, E>) -> Self {
72        let Self::Flowing(span) = self else {
73            panic!("Instrumented stream state is not Flowing");
74        };
75        span.end(aws_response);
76        Self::Finished
77    }
78}
79
80pin_project! {
81    /// A wrapper around a Stream that provides OpenTelemetry instrumentation for AWS operations.
82    ///
83    /// This struct automatically creates spans for stream operations and handles proper
84    /// span lifecycle management including error handling and completion tracking.
85    ///
86    /// The instrumented stream automatically adds the `aws.pagination_stream = true` attribute
87    /// to help identify pagination/streaming operations in traces.
88    ///
89    /// The instrumented stream maintains state to track the span lifecycle:
90    /// - `Waiting`: Initial state with a span builder ready to start
91    /// - `Flowing`: Active state with an ongoing span
92    /// - `Finished`: Terminal state after the stream completes or errors
93    pub struct InstrumentedStream<'a, S: Stream> {
94        #[pin]
95        inner: S,
96        state: Cell<StreamState<'a>>,
97    }
98}
99
100impl<T, E, S> Stream for InstrumentedStream<'_, S>
101where
102    E: RequestId + Error,
103    S: Stream<Item = Result<T, E>>,
104{
105    type Item = S::Item;
106
107    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        let this = self.project();
109        match this.state.get_mut().kind() {
110            StreamStateKind::Waiting => {
111                this.state.set(this.state.take().start());
112                this.inner.poll_next(cx)
113            }
114            StreamStateKind::Flowing => match this.inner.poll_next(cx) {
115                Poll::Ready(None) => {
116                    this.state.set(this.state.take().end(&Ok::<_, E>(Void)));
117                    Poll::Ready(None)
118                }
119                Poll::Ready(Some(Err(err))) => {
120                    let aws_result = Err(err);
121                    this.state.set(this.state.take().end(&aws_result));
122                    Poll::Ready(aws_result.err().map(Err))
123                }
124                result => result,
125            },
126            StreamStateKind::Finished => Poll::Ready(None),
127        }
128    }
129}
130
131/// A trait for adding OpenTelemetry instrumentation to AWS pagination streams.
132///
133/// This trait provides the `instrument` method that wraps streams with telemetry
134/// capabilities, automatically creating and managing spans for AWS operations.
135/// It is designed for AWS pagination streams, but it is implemented for any
136/// [`TryStream`][`futures_util::TryStream`] yielding AWS SDK errors.
137///
138/// The entire stream gets represented by a single [`AwsSpan`] regardless of the number
139/// of AWS SDK requests sent by the SDK to produce it.
140///
141/// All instrumented streams automatically include the `aws.pagination_stream = true`
142/// attribute to help identify streaming operations in traces.
143///
144/// # Integration with [`AwsBuilderInstrument`][`crate::middleware::aws::AwsBuilderInstrument`]
145///
146/// The recommended pattern is to use [`AwsBuilderInstrument::build_aws_span`][crate::middleware::aws::AwsBuilderInstrument::build_aws_span] to capture
147/// all the operation parameters before converting fluent builder into a paginator stream.
148///
149/// # Examples
150///
151/// ```rust
152/// use aws_sdk_dynamodb::{Client as DynamoClient, types::AttributeValue};
153/// use futures_util::TryStreamExt;
154/// use telemetry_rust::middleware::aws::{AwsBuilderInstrument, AwsStreamInstrument};
155///
156/// async fn query_table() -> Result<usize, Box<dyn std::error::Error>> {
157///     let config = aws_config::load_from_env().await;
158///     let dynamo_client = DynamoClient::new(&config);
159///
160///     // Build the query with all parameters
161///     let query = dynamo_client
162///         .query()
163///         .table_name("table_name")
164///         .index_name("my_index")
165///         .key_condition_expression("PK = :pk")
166///         .expression_attribute_values(":pk", AttributeValue::S("Test".to_string()));
167///
168///     // Extract span from fluent builder (includes all input attributes)
169///     let span = query.build_aws_span();
170///
171///     // Use the span to instrument the paginator stream.
172///     let items = query
173///         .into_paginator()
174///         .items()
175///         .send()
176///         .instrument(span)
177///         .try_collect::<Vec<_>>()
178///         .await?;
179///
180///     println!("DynamoDB items: {items:#?}");
181///     Ok(items.len())
182/// }
183/// ```
184pub trait AwsStreamInstrument<T, E, S>
185where
186    E: RequestId + Error,
187    S: Stream<Item = Result<T, E>>,
188{
189    /// Instruments the stream with OpenTelemetry tracing.
190    ///
191    /// This method wraps a [`TryStream`][`futures_util::TryStream`] in an [`InstrumentedStream`] that will:
192    /// - Start a span when the stream begins polling
193    /// - End the span with success when the stream completes normally
194    /// - End the span with error information if the stream encounters an error
195    ///
196    /// # Arguments
197    ///
198    /// * `span` - The span builder or span configuration to use for instrumentation
199    ///
200    /// # Returns
201    ///
202    /// An [`InstrumentedStream`] that wraps the original stream with telemetry capabilities.
203    fn instrument<'a>(
204        self,
205        span: impl Into<AwsSpanBuilder<'a>>,
206    ) -> InstrumentedStream<'a, S>;
207}
208
209impl<T, E, S> AwsStreamInstrument<T, E, S> for S
210where
211    E: RequestId + Error,
212    S: Stream<Item = Result<T, E>>,
213{
214    fn instrument<'a>(
215        self,
216        span: impl Into<AwsSpanBuilder<'a>>,
217    ) -> InstrumentedStream<'a, S> {
218        InstrumentedStream {
219            inner: self,
220            state: Cell::new(StreamState::new(span)),
221        }
222    }
223}
224
225impl<T, E> AwsStreamInstrument<T, E, PaginationStreamImplStream<Result<T, E>>>
226    for PaginationStream<Result<T, E>>
227where
228    E: RequestId + Error,
229{
230    fn instrument<'a>(
231        self,
232        span: impl Into<AwsSpanBuilder<'a>>,
233    ) -> InstrumentedStream<'a, PaginationStreamImplStream<Result<T, E>>> {
234        self.into_stream_03x().instrument(span)
235    }
236}