1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::task::{Context, Poll};
use tower::Service;

use crate::request::JobRequest;

/// Extension data for jobs.
///
/// forked from [axum::Extensions]
/// # In Context
///
/// This is commonly used to share state across jobs.
///
/// ```rust,ignore
/// use apalis::{
///     Extension,
///     WorkerBuilder,
///     JobContext
/// };
/// use std::sync::Arc;
///
/// // Some shared state used throughout our application
/// struct State {
///     // ...
/// }
///
/// async fn email_service(email: Email, ctx: JobContext) {
///     let state: &Arc<State> = ctx.data_opt().unwrap();
/// }
///
/// let state = Arc::new(State { /* ... */ });
///
/// let worker = WorkerBuilder::new(storage)
///     .layer(Extension(state))
///     .build_fn(email_service);
/// ```

#[derive(Debug, Clone, Copy)]
pub struct Extension<T>(pub T);

impl<S, T> tower::Layer<S> for Extension<T>
where
    T: Clone + Send + Sync + 'static,
{
    type Service = AddExtension<S, T>;

    fn layer(&self, inner: S) -> Self::Service {
        AddExtension {
            inner,
            value: self.0.clone(),
        }
    }
}

/// Middleware for adding some shareable value to [request extensions].
///
/// See [Sharing state with handlers](index.html#sharing-state-with-handlers)
/// for more details.
///
/// [request extensions]: https://docs.rs/http/latest/http/struct.Extensions.html
#[derive(Clone, Copy, Debug)]
pub struct AddExtension<S, T> {
    pub(crate) inner: S,
    pub(crate) value: T,
}

impl<J, S, T> Service<JobRequest<J>> for AddExtension<S, T>
where
    S: Service<JobRequest<J>>,
    T: Clone + Send + Sync + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    #[inline]
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut req: JobRequest<J>) -> Self::Future {
        req.context_mut().insert(self.value.clone());
        self.inner.call(req)
    }
}