1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
use tower::{
    layer::util::{Identity, Stack},
    Layer,
};

#[cfg(feature = "filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
use tower::filter::{AsyncFilterLayer, FilterLayer};

use std::{fmt::Debug, marker::PhantomData};

use crate::{
    job::JobStream,
    job_fn::{job_fn, JobFn},
    worker::Worker,
};

/// Configure and build a [Worker] job service.
///
/// `WorkerBuilder` collects all the components and configuration required to
/// build a job service. Once the service is defined, it can be built
/// with `build`.
///
/// # Examples
///
/// Defining a job service with the default [JobService];
///
/// ```rust,ignore
///
/// use apalis::prelude::*;
/// use apalis::sqlite::SqliteStorage;
///
/// let sqlite = SqliteStorage::new("sqlite::memory:").await.unwrap();
///
/// async fn email_service(job: JobRequest<Email>) -> Result<JobResult, JobError> {
///    Ok(JobResult::Success)
/// }
///
/// let addr = WorkerBuilder::new(sqlite)
///     .build_fn(email_service)
///     .start().await;
///
/// ```
///
///
/// Defining a middleware stack
///
/// ```rust,ignore
/// use apalis::layers::{
///    extensions::Extension,
///    retry::{JobRetryPolicy, RetryLayer},
/// };
///
/// use apalis::WorkerBuilder;
/// use apalis::sqlite::SqliteStorage;
///
/// let sqlite = SqliteStorage::connect("sqlite::memory:").await.unwrap();
///
/// #[derive(Clone)]
/// struct JobState {}
///
/// let addr = WorkerBuilder::new(sqlite)
///     .layer(RetryLayer::new(JobRetryPolicy))
///     .layer(Extension(JobState {}))
///     .build()
///     .start().await;
///
/// ```
#[derive(Debug)]
pub struct WorkerBuilder<T, S, M> {
    job: PhantomData<T>,
    pub(crate) layer: M,
    pub(crate) source: S,
}

impl<S> WorkerBuilder<(), S, Identity> {
    /// Build a new [WorkerBuilder] instance
    pub fn new(source: S) -> WorkerBuilder<S::Job, S, Identity>
    where
        S: JobStream,
    {
        let job: PhantomData<S::Job> = PhantomData;
        WorkerBuilder {
            job,
            layer: Identity::new(),
            source,
        }
    }
}

impl<T, S, M> WorkerBuilder<T, S, M> {
    /// Add a new layer `T` into the [WorkerBuilder].
    ///
    /// This wraps the inner service with the service provided by a user-defined
    /// [Layer]. The provided layer must implement the [Layer] trait.
    ///
    pub fn layer<U>(self, layer: U) -> WorkerBuilder<T, S, Stack<U, M>>
    where
        M: Layer<U>,
    {
        WorkerBuilder {
            job: self.job,
            source: self.source,
            layer: Stack::new(layer, self.layer),
        }
    }

    /// Conditionally reject requests based on `predicate`.
    ///
    /// `predicate` must implement the [`Predicate`] trait.
    ///
    /// This wraps the inner service with an instance of the [`Filter`]
    /// middleware.
    ///
    /// [`Filter`]: tower::filter::Filter
    #[cfg(feature = "filter")]
    #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
    pub fn filter<P>(self, predicate: P) -> WorkerBuilder<T, S, Stack<FilterLayer<P>, M>>
    where
        M: Layer<FilterLayer<P>>,
    {
        self.layer(FilterLayer::new(predicate))
    }

    /// Conditionally reject requests based on an asynchronous `predicate`.
    ///
    /// `predicate` must implement the [`AsyncPredicate`] trait.
    ///
    /// This wraps the inner service with an instance of the [`AsyncFilter`]
    /// middleware.
    /// [`AsyncFilter`]: tower::filter::
    #[cfg(feature = "filter")]
    pub fn filter_async<P>(self, predicate: P) -> WorkerBuilder<T, S, Stack<AsyncFilterLayer<P>, M>>
    where
        M: Layer<AsyncFilterLayer<P>>,
    {
        self.layer(AsyncFilterLayer::new(predicate))
    }

    /// Map one response type to another.
    ///
    /// This wraps the inner service with an instance of the [`MapResponse`]
    /// middleware.
    ///
    /// See the documentation for the [`map_response` combinator] for details.
    ///
    /// [`MapResponse`]: tower::util::MapResponse
    /// [`map_response` combinator]: tower::util::ServiceExt::map_response
    pub fn map_response<F>(
        self,
        f: F,
    ) -> WorkerBuilder<T, S, Stack<tower::util::MapResponseLayer<F>, M>>
    where
        M: Layer<tower::util::MapResponseLayer<F>>,
    {
        self.layer(tower::util::MapResponseLayer::new(f))
    }

    /// Map one error type to another.
    ///
    /// This wraps the inner service with an instance of the [`MapErr`]
    /// middleware.
    ///
    /// See the documentation for the [`map_err` combinator] for details.
    ///
    /// [`MapErr`]: tower::util::MapErr
    /// [`map_err` combinator]: tower::util::ServiceExt::map_err
    pub fn map_err<F>(self, f: F) -> WorkerBuilder<T, S, Stack<tower::util::MapErrLayer<F>, M>>
    where
        M: Layer<tower::util::MapErrLayer<F>>,
    {
        self.layer(tower::util::MapErrLayer::new(f))
    }
}

/// Helper trait for building new Workers from [WorkerBuilder]
pub trait WorkerFactory<S> {
    /// The worker to build
    type Worker: Worker;
    /// Builds a [WorkerFactory] using a [tower] service
    /// that can be used to generate new [Worker] actors using the `build` method
    /// # Arguments
    ///
    /// * `service` - A tower service
    ///
    /// # Examples
    ///
    fn build(self, service: S) -> Self::Worker;
}

/// Helper trait for building new Workers from [WorkerBuilder]

pub trait WorkerFactoryFn<F> {
    /// The worker build
    type Worker: Worker;
    /// Builds a [WorkerFactoryFn] using a [crate::job_fn::JobFn] service
    /// that can be used to generate new [Worker] actors using the `build` method
    /// # Arguments
    ///
    /// * `f` - A tower functional service
    ///
    /// # Examples
    ///
    fn build_fn(self, f: F) -> Self::Worker;
}

impl<W, F> WorkerFactoryFn<F> for W
where
    W: WorkerFactory<JobFn<F>>,
{
    type Worker = W::Worker;

    fn build_fn(self, f: F) -> Self::Worker {
        self.build(job_fn(f))
    }
}