1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use sentry_core::protocol;
use tower::Layer;
use tower::Service;

use crate::error::JobError;
use crate::job::{Job, JobId};
use crate::request::JobRequest;

/// Tower Layer that logs Job Details.
///
/// The Service created by this Layer can also optionally start a new
/// performance monitoring transaction for each incoming request,
/// continuing the trace based on incoming distributed tracing headers.
///
/// The created transaction will automatically use `J::NAME` as its name.
/// This is sometimes not desirable, In which case, users should manually override the transaction name
/// in the request handler using the [`Scope::set_transaction`](sentry_core::Scope::set_transaction)
/// method.
#[derive(Clone, Default, Debug)]
pub struct SentryJobLayer;

impl SentryJobLayer {
    /// Creates a new Layer that only logs Job details.
    pub fn new() -> Self {
        Self::default()
    }
}

/// Tower Service that logs Job details.
///
/// The Service can also optionally start a new performance monitoring transaction
/// for each incoming request, continuing the trace based on J::NAME
#[derive(Clone, Debug)]
pub struct SentryJobService<S> {
    service: S,
}

impl<S> Layer<S> for SentryJobLayer {
    type Service = SentryJobService<S>;

    fn layer(&self, service: S) -> Self::Service {
        Self::Service { service }
    }
}

struct JobDetails {
    job_id: JobId,
    current_attempt: i32,
    job_type: String,
}

pin_project_lite::pin_project! {
    /// The Future returned from [`SentryJobService`].
    pub struct SentryHttpFuture<F> {
        on_first_poll: Option<(
            JobDetails,
            sentry_core::TransactionContext
        )>,
        transaction: Option<(
            sentry_core::TransactionOrSpan,
            Option<sentry_core::TransactionOrSpan>,
        )>,
        #[pin]
        future: F,
    }
}

impl<F, Res> Future for SentryHttpFuture<F>
where
    F: Future<Output = Result<Res, JobError>> + 'static,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let slf = self.project();
        if let Some((job_details, trx_ctx)) = slf.on_first_poll.take() {
            let jid = job_details.job_id.clone();
            sentry_core::configure_scope(|scope| {
                scope.add_event_processor(move |mut event| {
                    event.event_id = uuid::Uuid::from_u128(jid.inner().0);
                    Some(event)
                });
                scope.set_tag("job_type", job_details.job_type.to_string());
                let mut details = std::collections::BTreeMap::new();
                details.insert(
                    String::from("job_id"),
                    job_details.job_id.to_string().into(),
                );
                details.insert(
                    String::from("current_attempt"),
                    job_details.current_attempt.into(),
                );
                scope.set_context("job", sentry_core::protocol::Context::Other(details));

                let transaction: sentry_core::TransactionOrSpan =
                    sentry_core::start_transaction(trx_ctx).into();
                let parent_span = scope.get_span();
                scope.set_span(Some(transaction.clone()));
                *slf.transaction = Some((transaction, parent_span));
            });
        }
        match slf.future.poll(cx) {
            Poll::Ready(res) => {
                if let Some((transaction, parent_span)) = slf.transaction.take() {
                    if transaction.get_status().is_none() {
                        let status = match &res {
                            Ok(_) => protocol::SpanStatus::Ok,
                            Err(err) => {
                                sentry_core::capture_error(err);
                                protocol::SpanStatus::InternalError
                            }
                        };
                        transaction.set_status(status);
                    }
                    transaction.finish();
                    sentry_core::configure_scope(|scope| scope.set_span(parent_span));
                }
                Poll::Ready(res)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<S, J, F, Res> Service<JobRequest<J>> for SentryJobService<S>
where
    S: Service<JobRequest<J>, Response = Res, Error = JobError, Future = F>,
    F: Future<Output = Result<Res, JobError>> + 'static,
    J: Job,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = SentryHttpFuture<S::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: JobRequest<J>) -> Self::Future {
        let op = J::NAME;
        let trx_ctx = sentry_core::TransactionContext::new(op, "apalis.job");
        let job_type = std::any::type_name::<J>().to_string();
        let job_details = JobDetails {
            job_id: request.id().clone(),
            current_attempt: request.attempts(),
            job_type,
        };

        SentryHttpFuture {
            on_first_poll: Some((job_details, trx_ctx)),
            transaction: None,
            future: self.service.call(request),
        }
    }
}