minitrace_futures/lib.rs
1// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.
2
3#![doc = include_str!("../README.md")]
4
5use std::pin::Pin;
6use std::task::Context;
7use std::task::Poll;
8
9use futures::Sink;
10use futures::Stream;
11use minitrace::Span;
12use pin_project_lite::pin_project;
13
14/// An extension trait for [`futures::Stream`] that provides tracing instrument adapters.
15pub trait StreamExt: futures::Stream + Sized {
16 /// Binds a [`Span`] to the [`Stream`] that continues to record until the stream is
17 /// **finished**.
18 ///
19 /// In addition, it sets the span as the local parent at every poll so that
20 /// [`minitrace::local::LocalSpan`] becomes available within the future. Internally, it
21 /// calls [`Span::set_local_parent`] when the executor polls it.
22 ///
23 /// # Examples:
24 ///
25 /// ```
26 /// # #[tokio::main]
27 /// # async fn main() {
28 /// use async_stream::stream;
29 /// use futures::StreamExt;
30 /// use minitrace::prelude::*;
31 /// use minitrace_futures::StreamExt as _;
32 ///
33 /// let root = Span::root("root", SpanContext::random());
34 /// let s = stream! {
35 /// for i in 0..2 {
36 /// yield i;
37 /// }
38 /// }
39 /// .in_span(Span::enter_with_parent("task", &root));
40 ///
41 /// tokio::pin!(s);
42 ///
43 /// assert_eq!(s.next().await.unwrap(), 0);
44 /// assert_eq!(s.next().await.unwrap(), 1);
45 /// assert_eq!(s.next().await, None);
46 /// // span ends here.
47 /// # }
48 /// ```
49 fn in_span(self, span: Span) -> InSpan<Self> {
50 InSpan {
51 inner: self,
52 span: Some(span),
53 }
54 }
55}
56
57impl<T> StreamExt for T where T: futures::Stream {}
58
59/// An extension trait for [`futures::Sink`] that provides tracing instrument adapters.
60pub trait SinkExt<Item>: futures::Sink<Item> + Sized {
61 /// Binds a [`Span`] to the [`Sink`] that continues to record until the sink is **closed**.
62 ///
63 /// In addition, it sets the span as the local parent at every poll so that
64 /// [`minitrace::local::LocalSpan`] becomes available within the future. Internally, it
65 /// calls [`Span::set_local_parent`] when the executor polls it.
66 ///
67 /// # Examples:
68 ///
69 /// ```
70 /// # #[tokio::main]
71 /// # async fn main() {
72 /// use futures::sink;
73 /// use futures::sink::SinkExt;
74 /// use minitrace::prelude::*;
75 /// use minitrace_futures::SinkExt as _;
76 ///
77 /// let root = Span::root("root", SpanContext::random());
78 ///
79 /// let mut drain = sink::drain().in_span(Span::enter_with_parent("task", &root));
80 ///
81 /// drain.send(1).await.unwrap();
82 /// drain.send(2).await.unwrap();
83 /// drain.close().await.unwrap();
84 /// // span ends here.
85 /// # }
86 /// ```
87 fn in_span(self, span: Span) -> InSpan<Self> {
88 InSpan {
89 inner: self,
90 span: Some(span),
91 }
92 }
93}
94
95impl<T, Item> SinkExt<Item> for T where T: futures::Sink<Item> {}
96
97pin_project! {
98 /// Adapter for [`StreamExt::in_span()`](StreamExt::in_span) and [`SinkExt::in_span()`](SinkExt::in_span).
99 pub struct InSpan<T> {
100 #[pin]
101 inner: T,
102 span: Option<Span>,
103 }
104}
105
106impl<T> Stream for InSpan<T>
107where T: Stream
108{
109 type Item = T::Item;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 let this = self.project();
113
114 let _guard = this.span.as_ref().map(|s| s.set_local_parent());
115 let res = this.inner.poll_next(cx);
116
117 match res {
118 r @ Poll::Pending => r,
119 r @ Poll::Ready(None) => {
120 // finished
121 this.span.take();
122 r
123 }
124 other => other,
125 }
126 }
127}
128
129impl<T, I> Sink<I> for InSpan<T>
130where T: Sink<I>
131{
132 type Error = T::Error;
133
134 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135 let this = self.project();
136 let _guard = this.span.as_ref().map(|s| s.set_local_parent());
137 this.inner.poll_ready(cx)
138 }
139
140 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
141 let this = self.project();
142 let _guard = this.span.as_ref().map(|s| s.set_local_parent());
143 this.inner.start_send(item)
144 }
145
146 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 let this = self.project();
148 let _guard = this.span.as_ref().map(|s| s.set_local_parent());
149 this.inner.poll_flush(cx)
150 }
151
152 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 let this = self.project();
154
155 let _guard = this.span.as_ref().map(|s| s.set_local_parent());
156 let res = this.inner.poll_close(cx);
157
158 match res {
159 r @ Poll::Pending => r,
160 other => {
161 // closed
162 this.span.take();
163 other
164 }
165 }
166 }
167}