1use crate::tokio::{POLL_RESULT, VALUE};
2use futures_util::Stream;
3use pin_project::pin_project;
4use std::any::type_name;
5use std::fmt::Debug;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tracing::{info_span, Span};
9use tracing_core::field::debug;
10
11#[pin_project]
12pub struct TLInstrumentedStream<T> {
13 #[pin]
14 inner: T,
15 span: Span,
16}
17
18impl<T> Stream for TLInstrumentedStream<T>
19where
20 T: Stream<Item: Debug>,
21{
22 type Item = T::Item;
23
24 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
25 let me = self.project();
26 let _entered = me.span.enter();
27 let poll = me.inner.poll_next(cx);
28 match &poll {
29 Poll::Ready(n) => {
30 me.span.record(VALUE, n.as_ref().map(debug));
31 }
32 Poll::Pending => {
33 me.span.record(POLL_RESULT, debug(&poll));
34 }
35 }
36 poll
37 }
38}
39
40pub trait TLStreamExt: Sized {
41 fn instrument_stream(self, name: &'static str) -> TLInstrumentedStream<Self>;
42}
43
44impl<T> TLStreamExt for T
45where
46 T: Stream,
47{
48 fn instrument_stream(self, name: &'static str) -> TLInstrumentedStream<Self> {
49 TLInstrumentedStream {
50 inner: self,
51 span: info_span!(
52 "[t:Stream]",
53 item_type = type_name::<T::Item>(),
54 "type" = type_name::<T>(),
55 name,
56 value = 0,
57 poll_result = ""
58 ),
59 }
60 }
61}