minitrace_futures/
lib.rs

1// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.
2
3#![doc = include_str!("../README.md")]
4
5use std::pin::Pin;
6use std::task::Context;
7use std::task::Poll;
8
9use futures::Sink;
10use futures::Stream;
11use minitrace::Span;
12use pin_project_lite::pin_project;
13
14/// An extension trait for [`futures::Stream`] that provides tracing instrument adapters.
15pub trait StreamExt: futures::Stream + Sized {
16    /// Binds a [`Span`] to the [`Stream`] that continues to record until the stream is
17    /// **finished**.
18    ///
19    /// In addition, it sets the span as the local parent at every poll so that
20    /// [`minitrace::local::LocalSpan`] becomes available within the future. Internally, it
21    /// calls [`Span::set_local_parent`] when the executor polls it.
22    ///
23    /// # Examples:
24    ///
25    /// ```
26    /// # #[tokio::main]
27    /// # async fn main() {
28    /// use async_stream::stream;
29    /// use futures::StreamExt;
30    /// use minitrace::prelude::*;
31    /// use minitrace_futures::StreamExt as _;
32    ///
33    /// let root = Span::root("root", SpanContext::random());
34    /// let s = stream! {
35    ///     for i in 0..2 {
36    ///         yield i;
37    ///     }
38    /// }
39    /// .in_span(Span::enter_with_parent("task", &root));
40    ///
41    /// tokio::pin!(s);
42    ///
43    /// assert_eq!(s.next().await.unwrap(), 0);
44    /// assert_eq!(s.next().await.unwrap(), 1);
45    /// assert_eq!(s.next().await, None);
46    /// // span ends here.
47    /// # }
48    /// ```
49    fn in_span(self, span: Span) -> InSpan<Self> {
50        InSpan {
51            inner: self,
52            span: Some(span),
53        }
54    }
55}
56
57impl<T> StreamExt for T where T: futures::Stream {}
58
59/// An extension trait for [`futures::Sink`] that provides tracing instrument adapters.
60pub trait SinkExt<Item>: futures::Sink<Item> + Sized {
61    /// Binds a [`Span`] to the [`Sink`] that continues to record until the sink is **closed**.
62    ///
63    /// In addition, it sets the span as the local parent at every poll so that
64    /// [`minitrace::local::LocalSpan`] becomes available within the future. Internally, it
65    /// calls [`Span::set_local_parent`] when the executor polls it.
66    ///
67    /// # Examples:
68    ///
69    /// ```
70    /// # #[tokio::main]
71    /// # async fn main() {
72    /// use futures::sink;
73    /// use futures::sink::SinkExt;
74    /// use minitrace::prelude::*;
75    /// use minitrace_futures::SinkExt as _;
76    ///
77    /// let root = Span::root("root", SpanContext::random());
78    ///
79    /// let mut drain = sink::drain().in_span(Span::enter_with_parent("task", &root));
80    ///
81    /// drain.send(1).await.unwrap();
82    /// drain.send(2).await.unwrap();
83    /// drain.close().await.unwrap();
84    /// // span ends here.
85    /// # }
86    /// ```
87    fn in_span(self, span: Span) -> InSpan<Self> {
88        InSpan {
89            inner: self,
90            span: Some(span),
91        }
92    }
93}
94
95impl<T, Item> SinkExt<Item> for T where T: futures::Sink<Item> {}
96
97pin_project! {
98    /// Adapter for [`StreamExt::in_span()`](StreamExt::in_span) and [`SinkExt::in_span()`](SinkExt::in_span).
99    pub struct InSpan<T> {
100        #[pin]
101        inner: T,
102        span: Option<Span>,
103    }
104}
105
106impl<T> Stream for InSpan<T>
107where T: Stream
108{
109    type Item = T::Item;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        let this = self.project();
113
114        let _guard = this.span.as_ref().map(|s| s.set_local_parent());
115        let res = this.inner.poll_next(cx);
116
117        match res {
118            r @ Poll::Pending => r,
119            r @ Poll::Ready(None) => {
120                // finished
121                this.span.take();
122                r
123            }
124            other => other,
125        }
126    }
127}
128
129impl<T, I> Sink<I> for InSpan<T>
130where T: Sink<I>
131{
132    type Error = T::Error;
133
134    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135        let this = self.project();
136        let _guard = this.span.as_ref().map(|s| s.set_local_parent());
137        this.inner.poll_ready(cx)
138    }
139
140    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
141        let this = self.project();
142        let _guard = this.span.as_ref().map(|s| s.set_local_parent());
143        this.inner.start_send(item)
144    }
145
146    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        let this = self.project();
148        let _guard = this.span.as_ref().map(|s| s.set_local_parent());
149        this.inner.poll_flush(cx)
150    }
151
152    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153        let this = self.project();
154
155        let _guard = this.span.as_ref().map(|s| s.set_local_parent());
156        let res = this.inner.poll_close(cx);
157
158        match res {
159            r @ Poll::Pending => r,
160            other => {
161                // closed
162                this.span.take();
163                other
164            }
165        }
166    }
167}