tracing_lv/
futures.rs

1use crate::tokio::{POLL_RESULT, VALUE};
2use futures_util::Stream;
3use pin_project::pin_project;
4use std::any::type_name;
5use std::fmt::Debug;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tracing::{info_span, Span};
9use tracing_core::field::debug;
10
11#[pin_project]
12pub struct TLInstrumentedStream<T> {
13    #[pin]
14    inner: T,
15    span: Span,
16}
17
18impl<T> Stream for TLInstrumentedStream<T>
19where
20    T: Stream<Item: Debug>,
21{
22    type Item = T::Item;
23
24    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
25        let me = self.project();
26        let _entered = me.span.enter();
27        let poll = me.inner.poll_next(cx);
28        match &poll {
29            Poll::Ready(n) => {
30                me.span.record(VALUE, n.as_ref().map(debug));
31            }
32            Poll::Pending => {
33                me.span.record(POLL_RESULT, debug(&poll));
34            }
35        }
36        poll
37    }
38}
39
40pub trait TLStreamExt: Sized {
41    fn instrument_stream(self, name: &'static str) -> TLInstrumentedStream<Self>;
42}
43
44impl<T> TLStreamExt for T
45where
46    T: Stream,
47{
48    fn instrument_stream(self, name: &'static str) -> TLInstrumentedStream<Self> {
49        TLInstrumentedStream {
50            inner: self,
51            span: info_span!(
52                "[t:Stream]",
53                item_type = type_name::<T::Item>(),
54                "type" = type_name::<T>(),
55                name,
56                value = 0,
57                poll_result = ""
58            ),
59        }
60    }
61}