blockscout_tracing_actix_web/
middleware.rs1use crate::{DefaultRootSpanBuilder, RequestId, RootSpan, RootSpanBuilder};
2use actix_web::body::{BodySize, MessageBody};
3use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
4use actix_web::http::StatusCode;
5use actix_web::web::Bytes;
6use actix_web::{Error, HttpMessage, ResponseError};
7use std::future::{ready, Future, Ready};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tracing::Span;
11
12pub struct TracingLogger<RootSpan: RootSpanBuilder> {
83 root_span_builder: std::marker::PhantomData<RootSpan>,
84}
85
86impl<RootSpan: RootSpanBuilder> Clone for TracingLogger<RootSpan> {
87 fn clone(&self) -> Self {
88 Self::new()
89 }
90}
91
92impl Default for TracingLogger<DefaultRootSpanBuilder> {
93 fn default() -> Self {
94 TracingLogger::new()
95 }
96}
97
98impl<RootSpan: RootSpanBuilder> TracingLogger<RootSpan> {
99 pub fn new() -> TracingLogger<RootSpan> {
100 TracingLogger {
101 root_span_builder: Default::default(),
102 }
103 }
104}
105
106impl<S, B, RootSpan> Transform<S, ServiceRequest> for TracingLogger<RootSpan>
107where
108 S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
109 S::Future: 'static,
110 B: MessageBody + 'static,
111 RootSpan: RootSpanBuilder,
112{
113 type Response = ServiceResponse<StreamSpan<B>>;
114 type Error = Error;
115 type Transform = TracingLoggerMiddleware<S, RootSpan>;
116 type InitError = ();
117 type Future = Ready<Result<Self::Transform, Self::InitError>>;
118
119 fn new_transform(&self, service: S) -> Self::Future {
120 ready(Ok(TracingLoggerMiddleware {
121 service,
122 root_span_builder: std::marker::PhantomData,
123 }))
124 }
125}
126
127pub struct SkipHttpTrace;
128
129#[doc(hidden)]
130pub struct TracingLoggerMiddleware<S, RootSpanBuilder> {
131 service: S,
132 root_span_builder: std::marker::PhantomData<RootSpanBuilder>,
133}
134
135#[allow(clippy::type_complexity)]
136impl<S, B, RootSpanType> Service<ServiceRequest> for TracingLoggerMiddleware<S, RootSpanType>
137where
138 S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
139 S::Future: 'static,
140 B: MessageBody + 'static,
141 RootSpanType: RootSpanBuilder,
142{
143 type Response = ServiceResponse<StreamSpan<B>>;
144 type Error = Error;
145 type Future = TracingResponse<S::Future, RootSpanType>;
146
147 actix_web::dev::forward_ready!(service);
148
149 fn call(&self, req: ServiceRequest) -> Self::Future {
150 req.extensions_mut().insert(RequestId::generate());
151 let root_span = RootSpanType::on_request_start(&req);
152
153 let root_span_wrapper = RootSpan::new(root_span.clone());
154 req.extensions_mut().insert(root_span_wrapper);
155
156 let skip_http_trace = req.extensions().contains::<SkipHttpTrace>();
157
158 let fut = root_span.in_scope(|| {
159 if !skip_http_trace {
160 tracing::info!("Started HTTP request processing");
161 }
162 self.service.call(req)
163 });
164
165 TracingResponse {
166 fut,
167 span: root_span,
168 skip_http_trace,
169 _root_span_type: std::marker::PhantomData,
170 }
171 }
172}
173
174#[doc(hidden)]
175#[pin_project::pin_project]
176pub struct TracingResponse<F, RootSpanType> {
177 #[pin]
178 fut: F,
179 span: Span,
180 skip_http_trace: bool,
181 _root_span_type: std::marker::PhantomData<RootSpanType>,
182}
183
184#[doc(hidden)]
185#[pin_project::pin_project]
186pub struct StreamSpan<B> {
187 #[pin]
188 body: B,
189 span: Span,
190}
191
192impl<F, B, RootSpanType> Future for TracingResponse<F, RootSpanType>
193where
194 F: Future<Output = Result<ServiceResponse<B>, Error>>,
195 B: MessageBody + 'static,
196 RootSpanType: RootSpanBuilder,
197{
198 type Output = Result<ServiceResponse<StreamSpan<B>>, Error>;
199
200 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
201 let this = self.project();
202
203 let fut = this.fut;
204 let span = this.span;
205 let skip_http_trace = *this.skip_http_trace;
206
207 span.in_scope(|| match fut.poll(cx) {
208 Poll::Pending => Poll::Pending,
209 Poll::Ready(outcome) => {
210 RootSpanType::on_request_end(Span::current(), &outcome);
211
212 if !skip_http_trace {
213 tracing::info!("Finished HTTP request processing");
214 }
215
216 #[cfg(feature = "emit_event_on_error")]
217 {
218 emit_event_on_error(&outcome);
219 }
220
221 Poll::Ready(outcome.map(|service_response| {
222 service_response.map_body(|_, body| StreamSpan {
223 body,
224 span: span.clone(),
225 })
226 }))
227 }
228 })
229 }
230}
231
232impl<B> MessageBody for StreamSpan<B>
233where
234 B: MessageBody,
235{
236 type Error = B::Error;
237
238 fn size(&self) -> BodySize {
239 self.body.size()
240 }
241
242 fn poll_next(
243 self: Pin<&mut Self>,
244 cx: &mut Context<'_>,
245 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
246 let this = self.project();
247
248 let body = this.body;
249 let span = this.span;
250 span.in_scope(|| body.poll_next(cx))
251 }
252}
253
254fn emit_event_on_error<B: 'static>(outcome: &Result<ServiceResponse<B>, actix_web::Error>) {
255 match outcome {
256 Ok(response) => {
257 if let Some(err) = response.response().error() {
258 emit_error_event(err.as_response_error(), response.status())
260 }
261 }
262 Err(error) => {
263 let response_error = error.as_response_error();
264 emit_error_event(response_error, response_error.status_code())
265 }
266 }
267}
268
269fn emit_error_event(response_error: &dyn ResponseError, status_code: StatusCode) {
270 let error_msg_prefix = "Error encountered while processing the incoming HTTP request";
271 if status_code.is_client_error() {
272 tracing::warn!("{}: {:?}", error_msg_prefix, response_error);
273 } else {
274 tracing::error!("{}: {:?}", error_msg_prefix, response_error);
275 }
276}