1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
use crate::{Backoff, BoxError, JobError, MaxRetries, NewJobInfo};
use serde::{de::DeserializeOwned, ser::Serialize};
use serde_json::Value;
use std::{future::Future, pin::Pin, time::SystemTime};
use tracing::{Instrument, Span};

/// The Job trait defines parameters pertaining to an instance of background job
///
/// Jobs are defnitions of work to be executed.
///
/// The simplest implementation defines the job's State and Future types, NAME contant, and
/// run method.
///
/// ### Example
///
/// ```rust
/// use background_jobs_core::{Job, new_job, BoxError};
/// use tracing::info;
/// use std::future::{ready, Ready};
///
/// #[derive(serde::Deserialize, serde::Serialize)]
/// struct MyJob {
///     count: i64,
/// }
///
/// impl Job for MyJob {
///     type State = ();
///     type Error = BoxError;
///     type Future = Ready<Result<(), BoxError>>;
///
///     const NAME: &'static str = "MyJob";
///
///     fn run(self, _: Self::State) -> Self::Future {
///         info!("Processing {}", self.count);
///
///         ready(Ok(()))
///     }
/// }
///
/// fn main() -> Result<(), BoxError> {
///     let job = new_job(MyJob { count: 1234 })?;
///
///     Ok(())
/// }
/// ```
pub trait Job: Serialize + DeserializeOwned + 'static {
    /// The application state provided to this job at runtime.
    type State: Clone + 'static;

    /// The error type this job returns
    type Error: Into<BoxError>;

    /// The future returned by this job
    type Future: Future<Output = Result<(), Self::Error>> + Send;

    /// The name of the job
    ///
    /// This name must be unique!!!
    const NAME: &'static str;

    /// The name of the default queue for this job
    ///
    /// This can be overridden on an individual-job level, but if a non-existant queue is supplied,
    /// the job will never be processed.
    const QUEUE: &'static str = "default";

    /// Define the default number of retries for this job
    ///
    /// Defaults to Count(5)
    /// Jobs can override
    const MAX_RETRIES: MaxRetries = MaxRetries::Count(5);

    /// Define the default backoff strategy for this job
    ///
    /// Defaults to Exponential(2)
    /// Jobs can override
    const BACKOFF: Backoff = Backoff::Exponential(2);

    /// Define how often a job should update its heartbeat timestamp
    ///
    /// This is important for allowing the job server to reap processes that were started but never
    /// completed.
    ///
    /// Defaults to 5 seconds
    /// Jobs can override
    const HEARTBEAT_INTERVAL: u64 = 5_000;

    /// Users of this library must define what it means to run a job.
    ///
    /// This should contain all the logic needed to complete a job. If that means queuing more
    /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy
    /// processes, that logic should all be called from inside this method.
    ///
    /// The state passed into this job is initialized at the start of the application. The state
    /// argument could be useful for containing a hook into something like r2d2, or the address of
    /// an actor in an actix-based system.
    fn run(self, state: Self::State) -> Self::Future;

    /// Generate a Span that the job will be processed within
    fn span(&self) -> Option<Span> {
        None
    }

    /// If this job should not use it's default queue, this can be overridden in
    /// user-code.
    fn queue(&self) -> &str {
        Self::QUEUE
    }

    /// If this job should not use it's default maximum retry count, this can be
    /// overridden in user-code.
    fn max_retries(&self) -> MaxRetries {
        Self::MAX_RETRIES
    }

    /// If this job should not use it's default backoff strategy, this can be
    /// overridden in user-code.
    fn backoff_strategy(&self) -> Backoff {
        Self::BACKOFF
    }

    /// Define how often a job should update its heartbeat timestamp
    ///
    /// This is important for allowing the job server to reap processes that were started but never
    /// completed.
    fn heartbeat_interval(&self) -> u64 {
        Self::HEARTBEAT_INTERVAL
    }
}

/// A provided method to create a new JobInfo from provided arguments
pub fn new_job<J>(job: J) -> Result<NewJobInfo, BoxError>
where
    J: Job,
{
    let job = NewJobInfo::new(
        J::NAME.to_owned(),
        job.queue().to_owned(),
        job.max_retries(),
        job.backoff_strategy(),
        job.heartbeat_interval(),
        serde_json::to_value(job).map_err(|_| ToJson)?,
    );

    Ok(job)
}

/// Create a NewJobInfo to schedule a job to be performed after a certain time
pub fn new_scheduled_job<J>(job: J, after: SystemTime) -> Result<NewJobInfo, BoxError>
where
    J: Job,
{
    let mut job = new_job(job)?;
    job.schedule(after);

    Ok(job)
}

/// A provided method to coerce arguments into the expected type and run the job
pub fn process<J>(
    args: Value,
    state: J::State,
) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>>
where
    J: Job,
{
    let res = serde_json::from_value::<J>(args).map(move |job| {
        if let Some(span) = job.span() {
            let fut = span.in_scope(move || job.run(state));

            (fut, Some(span))
        } else {
            (job.run(state), None)
        }
    });

    Box::pin(async move {
        let (fut, span) = res?;

        let res = if let Some(span) = span {
            fut.instrument(span).await
        } else {
            fut.await
        };

        res.map_err(Into::into).map_err(JobError::Processing)
    })
}

#[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to to turn job into value")]
pub struct ToJson;