rama_http/layer/trace/
on_eos.rs1use super::{DEFAULT_MESSAGE_LEVEL, Latency};
2use crate::header::HeaderMap;
3use crate::layer::classify::grpc_errors_as_failures::ParsedGrpcStatus;
4use rama_utils::latency::LatencyUnit;
5use std::time::Duration;
6use tracing::{Level, Span};
7
8pub trait OnEos: Send + Sync + 'static {
15 fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span);
27}
28
29impl OnEos for () {
30 #[inline]
31 fn on_eos(self, _: Option<&HeaderMap>, _: Duration, _: &Span) {}
32}
33
34impl<F> OnEos for F
35where
36 F: Fn(Option<&HeaderMap>, Duration, &Span) + Send + Sync + 'static,
37{
38 fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) {
39 self(trailers, stream_duration, span)
40 }
41}
42
43#[derive(Clone, Debug)]
47pub struct DefaultOnEos {
48 level: Level,
49 latency_unit: LatencyUnit,
50}
51
52impl Default for DefaultOnEos {
53 fn default() -> Self {
54 Self {
55 level: DEFAULT_MESSAGE_LEVEL,
56 latency_unit: LatencyUnit::Millis,
57 }
58 }
59}
60
61impl DefaultOnEos {
62 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub fn level(mut self, level: Level) -> Self {
74 self.level = level;
75 self
76 }
77
78 pub fn set_level(&mut self, level: Level) -> &mut Self {
85 self.level = level;
86 self
87 }
88
89 pub fn latency_unit(mut self, latency_unit: LatencyUnit) -> Self {
93 self.latency_unit = latency_unit;
94 self
95 }
96
97 pub fn set_latency_unit(&mut self, latency_unit: LatencyUnit) -> &mut Self {
101 self.latency_unit = latency_unit;
102 self
103 }
104}
105
106impl OnEos for DefaultOnEos {
107 fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span) {
108 let stream_duration = Latency {
109 unit: self.latency_unit,
110 duration: stream_duration,
111 };
112 let status = trailers.and_then(|trailers| {
113 match crate::layer::classify::grpc_errors_as_failures::classify_grpc_metadata(
114 trailers,
115 crate::layer::classify::GrpcCode::Ok.into_bitmask(),
116 ) {
117 ParsedGrpcStatus::Success
118 | ParsedGrpcStatus::HeaderNotString
119 | ParsedGrpcStatus::HeaderNotInt => Some(0),
120 ParsedGrpcStatus::NonSuccess(status) => Some(status.get()),
121 ParsedGrpcStatus::GrpcStatusHeaderMissing => None,
122 }
123 });
124
125 event_dynamic_lvl!(self.level, %stream_duration, status, "end of stream");
126 }
127}