mod contextual_span;
mod make_span;
mod on_failure;
mod on_request;
mod on_response;
#[cfg(feature = "opentelemetry")]
mod otel_context;
use apalis_core::task::Task;
use std::{
fmt::{self, Debug},
pin::Pin,
task::{Context, Poll, ready},
time::Instant,
};
use tower::Service;
use tracing::{Level, Span};
pub use self::{
contextual_span::ContextualTaskSpan,
make_span::{DefaultMakeSpan, MakeSpan},
on_failure::{DefaultOnFailure, OnFailure},
on_request::{DefaultOnRequest, OnRequest},
on_response::{DefaultOnResponse, OnResponse},
};
pub use apalis_core::task::metadata::TracingContext;
#[cfg(feature = "opentelemetry")]
pub use otel_context::OtelTraceContext;
use tower::Layer;
const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub enum LatencyUnit {
Seconds,
Millis,
Micros,
Nanos,
}
#[derive(Debug, Copy, Clone)]
pub struct TraceLayer<
MakeSpan = DefaultMakeSpan,
OnRequest = DefaultOnRequest,
OnResponse = DefaultOnResponse,
OnFailure = DefaultOnFailure,
> {
pub(crate) make_span: MakeSpan,
pub(crate) on_request: OnRequest,
pub(crate) on_response: OnResponse,
pub(crate) on_failure: OnFailure,
}
impl TraceLayer {
pub fn new() -> Self {
Self {
make_span: DefaultMakeSpan::new(),
on_failure: DefaultOnFailure::default(),
on_request: DefaultOnRequest::default(),
on_response: DefaultOnResponse::default(),
}
}
}
impl Default for TraceLayer {
fn default() -> Self {
Self::new()
}
}
impl<MakeSpan, OnRequest, OnResponse, OnFailure>
TraceLayer<MakeSpan, OnRequest, OnResponse, OnFailure>
{
pub fn on_request<NewOnRequest>(
self,
new_on_request: NewOnRequest,
) -> TraceLayer<MakeSpan, NewOnRequest, OnResponse, OnFailure> {
TraceLayer {
on_request: new_on_request,
on_failure: self.on_failure,
make_span: self.make_span,
on_response: self.on_response,
}
}
pub fn on_response<NewOnResponse>(
self,
new_on_response: NewOnResponse,
) -> TraceLayer<MakeSpan, OnRequest, NewOnResponse, OnFailure> {
TraceLayer {
on_response: new_on_response,
on_request: self.on_request,
on_failure: self.on_failure,
make_span: self.make_span,
}
}
pub fn on_failure<NewOnFailure>(
self,
new_on_failure: NewOnFailure,
) -> TraceLayer<MakeSpan, OnRequest, OnResponse, NewOnFailure> {
TraceLayer {
on_failure: new_on_failure,
on_request: self.on_request,
make_span: self.make_span,
on_response: self.on_response,
}
}
pub fn make_span_with<NewMakeSpan>(
self,
new_make_span: NewMakeSpan,
) -> TraceLayer<NewMakeSpan, OnRequest, OnResponse, OnFailure> {
TraceLayer {
make_span: new_make_span,
on_request: self.on_request,
on_failure: self.on_failure,
on_response: self.on_response,
}
}
}
impl<S, MakeSpan, OnRequest, OnResponse, OnFailure> Layer<S>
for TraceLayer<MakeSpan, OnRequest, OnResponse, OnFailure>
where
MakeSpan: Clone,
OnRequest: Clone,
OnResponse: Clone,
OnFailure: Clone,
{
type Service = Trace<S, MakeSpan, OnRequest, OnResponse, OnFailure>;
fn layer(&self, inner: S) -> Self::Service {
Trace {
inner,
make_span: self.make_span.clone(),
on_request: self.on_request.clone(),
on_response: self.on_response.clone(),
on_failure: self.on_failure.clone(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Trace<
S,
MakeSpan = DefaultMakeSpan,
OnRequest = DefaultOnRequest,
OnResponse = DefaultOnResponse,
OnFailure = DefaultOnFailure,
> {
pub(crate) inner: S,
pub(crate) make_span: MakeSpan,
pub(crate) on_request: OnRequest,
pub(crate) on_response: OnResponse,
pub(crate) on_failure: OnFailure,
}
impl<S> Trace<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
make_span: DefaultMakeSpan::new(),
on_request: DefaultOnRequest::default(),
on_response: DefaultOnResponse::default(),
on_failure: DefaultOnFailure::default(),
}
}
pub fn layer() -> TraceLayer {
TraceLayer::new()
}
}
impl<S, MakeSpan, OnRequest, OnResponse, OnFailure>
Trace<S, MakeSpan, OnRequest, OnResponse, OnFailure>
{
pub fn get_ref(&self) -> &S {
&self.inner
}
pub fn get_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn into_inner(self) -> S {
self.inner
}
pub fn on_request<NewOnRequest>(
self,
new_on_request: NewOnRequest,
) -> Trace<S, MakeSpan, NewOnRequest, OnResponse, OnFailure> {
Trace {
on_request: new_on_request,
inner: self.inner,
on_failure: self.on_failure,
make_span: self.make_span,
on_response: self.on_response,
}
}
pub fn on_response<NewOnResponse>(
self,
new_on_response: NewOnResponse,
) -> Trace<S, MakeSpan, OnRequest, NewOnResponse, OnFailure> {
Trace {
on_response: new_on_response,
inner: self.inner,
on_request: self.on_request,
on_failure: self.on_failure,
make_span: self.make_span,
}
}
pub fn on_failure<NewOnFailure>(
self,
new_on_failure: NewOnFailure,
) -> Trace<S, MakeSpan, OnRequest, OnResponse, NewOnFailure> {
Trace {
on_failure: new_on_failure,
inner: self.inner,
make_span: self.make_span,
on_request: self.on_request,
on_response: self.on_response,
}
}
pub fn make_span_with<NewMakeSpan>(
self,
new_make_span: NewMakeSpan,
) -> Trace<S, NewMakeSpan, OnRequest, OnResponse, OnFailure> {
Trace {
make_span: new_make_span,
inner: self.inner,
on_failure: self.on_failure,
on_request: self.on_request,
on_response: self.on_response,
}
}
}
impl<Args, S, OnRequestT, OnResponseT, OnFailureT, MakeSpanT, F, Res, Ctx, IdType>
Service<Task<Args, Ctx, IdType>> for Trace<S, MakeSpanT, OnRequestT, OnResponseT, OnFailureT>
where
S: Service<Task<Args, Ctx, IdType>, Response = Res, Future = F> + Unpin + Send + 'static,
S::Error: fmt::Display + 'static,
MakeSpanT: MakeSpan<Args, Ctx, IdType>,
OnRequestT: OnRequest<Args, Ctx, IdType>,
OnResponseT: OnResponse<Res> + Clone + 'static,
F: Future<Output = Result<Res, S::Error>> + 'static,
OnFailureT: OnFailure<S::Error> + Clone + 'static,
{
type Response = Res;
type Error = S::Error;
type Future = ResponseFuture<F, OnResponseT, OnFailureT>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Task<Args, Ctx, IdType>) -> Self::Future {
let span = self.make_span.make_span(&req);
let start = Instant::now();
let job = {
let _guard = span.enter();
self.on_request.on_request(&req, &span);
self.inner.call(req)
};
ResponseFuture {
inner: job,
span,
on_response: Some(self.on_response.clone()),
on_failure: Some(self.on_failure.clone()),
start,
}
}
}
#[pin_project::pin_project]
#[derive(Debug)]
pub struct ResponseFuture<F, OnResponse, OnFailure> {
#[pin]
pub(crate) inner: F,
pub(crate) span: Span,
pub(crate) on_response: Option<OnResponse>,
pub(crate) on_failure: Option<OnFailure>,
pub(crate) start: Instant,
}
impl<Fut, OnResponseT, OnFailureT, Res, E> Future for ResponseFuture<Fut, OnResponseT, OnFailureT>
where
Fut: Future<Output = Result<Res, E>>,
OnResponseT: OnResponse<Res>,
OnFailureT: OnFailure<E>,
{
type Output = Result<Res, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.span.enter();
let result = ready!(this.inner.poll(cx));
let done_in = this.start.elapsed();
match result {
Ok(res) => {
if let Some(responder) = this.on_response.take() {
responder.on_response(&res, done_in, this.span);
}
Poll::Ready(Ok(res))
}
Err(err) => {
if let Some(mut fail) = this.on_failure.take() {
fail.on_failure(&err, done_in, this.span);
}
Poll::Ready(Err(err))
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::layers::WorkerBuilderExt;
use super::*;
use apalis_core::{
backend::{TaskSink, memory::MemoryStorage},
error::BoxDynError,
task::{extensions::Extensions, task_id::RandomId},
worker::{
builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
},
};
#[tokio::test]
async fn basic_worker_tracing() {
let mut in_memory = MemoryStorage::new();
in_memory.push(42).await.unwrap();
async fn task(task: u32, worker: WorkerContext) -> Result<(), BoxDynError> {
if task == 42 {
println!("Stopping worker from task");
worker.stop().unwrap();
}
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(in_memory)
.enable_tracing()
.on_event(|ctx, ev| {
println!("CTX {:?}, On Event = {ev:?}", ctx.name());
})
.build(task);
worker.run().await.unwrap();
}
#[tokio::test]
async fn custom_worker_tracing() {
let mut in_memory = MemoryStorage::new();
in_memory.push(42).await.unwrap();
async fn task(task: u32, worker: WorkerContext) -> Result<(), BoxDynError> {
if task == 42 {
println!("Stopping worker from task");
worker.stop().unwrap();
}
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(in_memory)
.layer(
TraceLayer::new()
.make_span_with(|req: &Task<u32, Extensions, RandomId>| {
tracing::span!(
tracing::Level::INFO,
"custom_span",
task_id = req.parts.task_id.as_ref().unwrap().to_string()
)
})
.on_request(|task: &Task<u32, Extensions, RandomId>, span: &tracing::Span| {
tracing::info!(parent: span, "Custom OnRequest: Received task: {:?}", task);
})
.on_response(|_: &() , duration: Duration, span: &tracing::Span| {
tracing::info!(parent: span, "Custom OnResponse: Completed in {:?}", duration);
})
)
.on_event(|ctx, ev| {
println!("CTX {:?}, On Event = {ev:?}", ctx.name());
})
.build(task);
worker.run().await.unwrap();
}
}