rama_http/layer/trace/
on_eos.rs

1use super::{DEFAULT_MESSAGE_LEVEL, Latency};
2use crate::header::HeaderMap;
3use crate::layer::classify::grpc_errors_as_failures::ParsedGrpcStatus;
4use rama_utils::latency::LatencyUnit;
5use std::time::Duration;
6use tracing::{Level, Span};
7
8/// Trait used to tell [`Trace`] what to do when a stream closes.
9///
10/// See the [module docs](../trace/index.html#on_eos) for details on exactly when the `on_eos`
11/// callback is called.
12///
13/// [`Trace`]: super::Trace
14pub trait OnEos: Send + Sync + 'static {
15    /// Do the thing.
16    ///
17    /// `stream_duration` is the duration since the response was sent.
18    ///
19    /// `span` is the `tracing` [`Span`], corresponding to this request, produced by the closure
20    /// passed to [`TraceLayer::make_span_with`]. It can be used to [record field values][record]
21    /// that weren't known when the span was created.
22    ///
23    /// [`Span`]: https://docs.rs/tracing/latest/tracing/span/index.html
24    /// [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
25    /// [`TraceLayer::make_span_with`]: crate::layer::trace::TraceLayer::make_span_with
26    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span);
27}
28
29impl OnEos for () {
30    #[inline]
31    fn on_eos(self, _: Option<&HeaderMap>, _: Duration, _: &Span) {}
32}
33
34impl<F> OnEos for F
35where
36    F: Fn(Option<&HeaderMap>, Duration, &Span) + Send + Sync + 'static,
37{
38    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) {
39        self(trailers, stream_duration, span)
40    }
41}
42
43/// The default [`OnEos`] implementation used by [`Trace`].
44///
45/// [`Trace`]: super::Trace
46#[derive(Clone, Debug)]
47pub struct DefaultOnEos {
48    level: Level,
49    latency_unit: LatencyUnit,
50}
51
52impl Default for DefaultOnEos {
53    fn default() -> Self {
54        Self {
55            level: DEFAULT_MESSAGE_LEVEL,
56            latency_unit: LatencyUnit::Millis,
57        }
58    }
59}
60
61impl DefaultOnEos {
62    /// Create a new [`DefaultOnEos`].
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    /// Set the [`Level`] used for [tracing events].
68    ///
69    /// Defaults to [`Level::DEBUG`].
70    ///
71    /// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
72    /// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
73    pub fn level(mut self, level: Level) -> Self {
74        self.level = level;
75        self
76    }
77
78    /// Set the [`Level`] used for [tracing events].
79    ///
80    /// Defaults to [`Level::DEBUG`].
81    ///
82    /// [tracing events]: https://docs.rs/tracing/latest/tracing/#events
83    /// [`Level::DEBUG`]: https://docs.rs/tracing/latest/tracing/struct.Level.html#associatedconstant.DEBUG
84    pub fn set_level(&mut self, level: Level) -> &mut Self {
85        self.level = level;
86        self
87    }
88
89    /// Set the [`LatencyUnit`] latencies will be reported in.
90    ///
91    /// Defaults to [`LatencyUnit::Millis`].
92    pub fn latency_unit(mut self, latency_unit: LatencyUnit) -> Self {
93        self.latency_unit = latency_unit;
94        self
95    }
96
97    /// Set the [`LatencyUnit`] latencies will be reported in.
98    ///
99    /// Defaults to [`LatencyUnit::Millis`].
100    pub fn set_latency_unit(&mut self, latency_unit: LatencyUnit) -> &mut Self {
101        self.latency_unit = latency_unit;
102        self
103    }
104}
105
106impl OnEos for DefaultOnEos {
107    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span) {
108        let stream_duration = Latency {
109            unit: self.latency_unit,
110            duration: stream_duration,
111        };
112        let status = trailers.and_then(|trailers| {
113            match crate::layer::classify::grpc_errors_as_failures::classify_grpc_metadata(
114                trailers,
115                crate::layer::classify::GrpcCode::Ok.into_bitmask(),
116            ) {
117                ParsedGrpcStatus::Success
118                | ParsedGrpcStatus::HeaderNotString
119                | ParsedGrpcStatus::HeaderNotInt => Some(0),
120                ParsedGrpcStatus::NonSuccess(status) => Some(status.get()),
121                ParsedGrpcStatus::GrpcStatusHeaderMissing => None,
122            }
123        });
124
125        event_dynamic_lvl!(self.level, %stream_duration, status, "end of stream");
126    }
127}