rexecutor-sqlx 0.1.0

A robust job processing library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
[![CI](https://github.com/Johnabell/rexecutor/actions/workflows/ci.yaml/badge.svg)](https://github.com/Johnabell/rexecutor/actions/workflows/ci.yaml) [![codecov](https://codecov.io/gh/Johnabell/rexecutor/graph/badge.svg?token=BpZUtSgnWT)](https://codecov.io/gh/Johnabell/rexecutor)

# Rexecutor

A robust job execution library for rust built on the tokio runtime.

For example usage see [postgres example](examples/postgres/src/main.rs).

## Setting up `Rexecutor`

To create an instance of `Rexecutor` you will need to have an implementation of `Backend`.

The rexecutor library only provides and in memory implementation `backend::memory::InMemoryBackend`
which is primarily provided for testing purposes. Instead a seperate crate implementing the Backend
should be used for example `rexecutor-sqlx`

## Creating executors

Jobs are defined by creating a struct/enum and implementing `Executor` for it.

### Example defining an executor

You can define and enqueue a job as follows:

```rust
use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;

#[async_trait::async_trait]
impl Executor for EmailJob {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_job";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {}", Self::NAME, job.data);
        /// Do something important with an email
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = EmailJob::builder()
        .with_data("bob.shuruncle@example.com".to_owned())
        .schedule_in(TimeDelta::hours(3))
        .enqueue()
        .await;

    assert_enqueued!(
        with_data: "bob.shuruncle@example.com".to_owned(),
        scheduled_after: Utc::now() + TimeDelta::minutes(170),
        scheduled_before: Utc::now() + TimeDelta::minutes(190),
        for_executor: EmailJob
    );
});
```

### Unique jobs

It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as
part of the implementation of `Executor` via `Executor::UNIQUENESS_CRITERIA` or when
inserting the job via `job::builder::JobBuilder::unique`.

For example to ensure that only one unique job is ran every five minutes it is possible to use
the following uniqueness criteria.

```rust
use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct UniqueJob;

#[async_trait::async_trait]
impl Executor for UniqueJob {
    type Data = ();
    type Metadata = ();
    const NAME: &'static str = "unique_job";
    const MAX_ATTEMPTS: u16 = 1;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
        UniquenessCriteria::by_executor()
            .and_within(TimeDelta::seconds(300))
            .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
    );
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {:?}", Self::NAME, job.data);
        // Do something important
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = UniqueJob::builder().enqueue().await;
    let _ = UniqueJob::builder().enqueue().await;

    // Only one of jobs was enqueued
    assert_enqueued!(
        1 job,
        scheduled_before: Utc::now(),
        for_executor: UniqueJob
    );
});
```

Additionally it is possible to specify what action should be taken when there is a conflicting
job. In the example above the priority is override. For more details of how to use uniqueness
see `job::uniqueness_criteria::UniquenessCriteria`.


## Overriding `Executor` default values

When defining an `Executor` you specify the maximum number of attempts via
`Executor::MAX_ATTEMPTS`. However, when inserting a job it is possible to override this value
by calling `job::builder::JobBuilder::with_max_attempts` (if not called the max attempts will be equal to
`Executor::MAX_ATTEMPTS`).

Similarly, the executor can define a job uniqueness criteria via
`Executor::UNIQUENESS_CRITERIA`. However, using `job::builder::JobBuilder::unique` it is possible to
override this value for a specific job.

## Setting up executors to run

For each executor you would like to run `Rexecutor::with_executor` should be called. Being
explicit about this opens the possibility of having specific nodes in a cluster running as
worker nodes for certain enqueued jobs while other node not responsible for their execution.

### Example setting up executors

```rust
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>();
});
```

## Enqueuing jobs

Generally jobs will be enqueued using the `job::builder::JobBuilder` returned by `Executor::builder`.

When enqueuing jobs the data and metadata of the job can be specified. Additionally, the
default value of the `Executor` can be overriden.

### Overriding `Executor` default values

When defining an `Executor` you specify the maximum number of attempts via
`Executor::MAX_ATTEMPTS`. However, when inserting a job it is possible to override this value
by calling `job::builder::JobBuilder::with_max_attempts` (if not called the max attempts will be equal to
`Executor::MAX_ATTEMPTS`.

Similarly, the executor can define a job uniqueness criteria via
`Executor::UNIQUENESS_CRITERIA`. However, using `job::builder::JobBuilder::unique` it is possible to
override this value for a specific job.

### Example enqueuing a job

```rust
use rexecutor::prelude::*;
use std::sync::Arc;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
pub(crate) struct ExampleExecutor;

#[async_trait::async_trait]
impl Executor for ExampleExecutor {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "simple_executor";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = Arc::new(InMemoryBackend::new().paused());
    Rexecutor::new(backend.clone()).set_global_backend().unwrap();

    ExampleExecutor::builder()
        .with_max_attempts(2)
        .with_tags(vec!["initial_job", "delayed"])
        .with_data("First job".into())
        .schedule_in(TimeDelta::hours(2))
        .enqueue_to_backend(&backend)
        .await
        .unwrap();

    assert_enqueued!(
        to: backend,
        with_data: "First job".to_owned(),
        tagged_with: ["initial_job", "delayed"],
        scheduled_after: Utc::now() + TimeDelta::minutes(110),
        scheduled_before: Utc::now() + TimeDelta::minutes(130),
        for_executor: ExampleExecutor
    );
});
```

## Compile time scheduling of cron jobs

It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
either `Rexecutor::with_cron_executor` or `Rexecutor::with_cron_executor_for_timezone`. The
later is use to specify the specific timezone that the jobs should be scheduled to run in.

### Example setting up a UTC cron job

To setup a cron jobs to run every day at midnight you can use the following code.

```rust
use rexecutor::prelude::*;
use rexecutor::backend::{Backend, memory::InMemoryBackend};
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

    let backend = InMemoryBackend::new();
    Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
});
```

## Pruning jobs

After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
To setup the job pruner `Rexecutor::with_job_pruner` should be called passing in the given
`PrunerConfig`.

Given the different ways in which jobs can finish it is often useful to be able to have fine
grained control over how old jobs should be cleaned up. `PrunerConfig` enables such control.

When constructing `PrunerConfig` a `cron::Schedule` is provided to specify when the pruner
should run.

Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
from once a year through to multiple times per hour.

### Example configuring the job pruner

```rust
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .with_job_pruner(config);
});
```

## Shutting rexecutor down

To avoid jobs getting killed mid way through their executions it is important to make use of
graceful shutdown. This can either explicitly be called using `Rexecutor::graceful_shutdown`,
or via use of the `DropGuard` obtained via `Rexecutor::drop_guard`.

Using `Rexecutor::graceful_shutdown` or `Rexecutor::drop_guard` will ensure that all
currently executing jobs will be allowed time to complete before shutting rexecutor down.

### Example using the `DropGuard`

```rust
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    // Note this must be given a name to ensure it is dropped at the end of the scope.
    // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
    let _guard = Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .drop_guard();
});
```

## Global backend

Rexecutor can be ran either with use of a global backend. This enables the use of the
convenience `job::builder::JobBuilder::enqueue` method which does not require a reference to
the backend to be passed down to the code that needs to enqueue a job.

The global backend can be set using `Rexecutor::set_global_backend` this should only be
called once otherwise it will return an error.

In fact for a single `Rexecutor` instance it is impossible to call this twice, the following code
snippet will fail to compile

```rust
use rexecutor::prelude::*;
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
```

Note, using a global backend has many of the same drawbacks of any global variable in
particular it can make unit testing more difficult.

## Code of conduct

We follow the [Rust code of conduct](https://www.rust-lang.org/policies/code-of-conduct).

Currently the moderation team consists of John Bell only. We would welcome more members: if you would like to join the moderation team, please contact John Bell.

## Licence

The project is licensed under the [MIT license](https://github.com/Johnabell/atom_box/blob/master/LICENSE).