fang 0.7.0

Background job processing library for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
<p align="center"><img src="logo.png" alt="fang" height="300px"></p>

[![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style]

# Fang

Background task processing library for Rust. It uses Postgres DB as a task queue.

## Features 
- Asynk feature uses `tokio`. Workers are started in tokio tasks.
- Blocking feature uses `std::thread`. Workers are started in a separated threads.

## Installation

1. Add this to your Cargo.toml


#### Blocking feature
```toml
[dependencies]
fang = { version = "0.7" , features = ["blocking"]}
```

#### Asynk feature
```toml
[dependencies]
fang = { version = "0.7" , features = ["asynk"]}
```
*Supports rustc 1.62+*

2. Create `fang_tasks` table in the Postgres database. The migration can be found in [the migrations directory]https://github.com/ayrat555/fang/blob/master/migrations/2021-06-05-112912_create_fang_tasks/up.sql.

## Usage

### Defining a task

#### Blocking feature
Every task should implement `fang::Runnable` trait which is used by `fang` to execute it.

```rust
use fang::Error;
use fang::Runnable;
use fang::typetag;
use fang::PgConnection;
use fang::serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct MyTask {
    pub number: u16,
}

#[typetag::serde]
impl Runnable for MyTask {
    fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
        println!("the number is {}", self.number);

        Ok(())
    }
}
```

As you can see from the example above, the trait implementation has `#[typetag::serde]` attribute which is used to deserialize the task.

The second parameter  of the `run` function is diesel's PgConnection, You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. Or you can just re-use it in your own queries if you're using diesel. If you don't need it, just ignore it.


#### Asynk feature
Every task should implement `fang::AsyncRunnable` trait which is used by `fang` to execute it.

Also be careful to not to call with the same name two impl of AsyncRunnable, because will cause a fail with typetag. 
```rust
use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;

#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
  pub number: u16,
}

#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
    async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
        Ok(())
    }
    // this func is optional to impl 
    // Default task-type it is common
    fn task_type(&self) -> String {
        "my-task-type".to_string()
    }
}
```
### Enqueuing a task

#### Blocking feature
To enqueue a task use `Queue::enqueue_task`


```rust
use fang::Queue;

...

Queue::enqueue_task(&MyTask { number: 10 }).unwrap();

```

The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several tasks use Postgres struct instance:

```rust
let queue = Queue::new();

for id in &unsynced_feed_ids {
    queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
}

```

Or you can use `PgConnection` struct:

```rust
Queue::push_task_query(pg_connection, &new_task).unwrap();
```

#### Asynk feature
To enqueue a task use `AsyncQueueable::insert_task`,
depending of the backend that you prefer you will need to do it with a specific queue.

For Postgres backend.
```rust
use fang::asynk::async_queue::AsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;

// Create a AsyncQueue
let max_pool_size: u32 = 2;

let mut queue = AsyncQueue::builder()
    // Postgres database url
    .uri("postgres://postgres:postgres@localhost/fang")
    // Max number of connections that are allowed 
    .max_pool_size(max_pool_size)
    // false if would like Uniqueness in tasks
    .duplicated_tasks(true)
    .build();

// Always connect first in order to perform any operation
queue.connect(NoTls).await.unwrap();

```
For easy example we are using NoTls type, if for some reason you would like to encrypt postgres traffic.

You can implement a Tls type.

It is well documented for [openssl](https://docs.rs/postgres-openssl/latest/postgres_openssl/) and [native-tls](https://docs.rs/postgres-native-tls/latest/postgres_native_tls/)

```rust
// AsyncTask from first example
let task = AsyncTask { 8 };
let task_returned = queue
  .insert_task(&task as &dyn AsyncRunnable)
  .await
  .unwrap();
```

### Starting workers

#### Blocking feature
Every worker runs in a separate thread. In case of panic, they are always restarted.

Use `WorkerPool` to start workers. `WorkerPool::new` accepts one parameter - the number of workers.


```rust
use fang::WorkerPool;

WorkerPool::new(10).start();
```

Use `shutdown` to stop worker threads, they will try to finish in-progress tasks.

```rust

use fang::WorkerPool;

worker_pool = WorkerPool::new(10).start().unwrap;

worker_pool.shutdown()
```

Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
Simple Worker for an example implementation.

#### Asynk feature
Every worker runs in a separate tokio task. In case of panic, they are always restarted.
Use `AsyncWorkerPool` to start workers.

```rust
use fang::asynk::async_worker_pool::AsyncWorkerPool;

// Need to create a queue
// Also insert some tasks

let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
        .number_of_workers(max_pool_size)
        .queue(queue.clone())
        .build();

pool.start().await;
```


Check out:

- [Simple Worker Example]https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker - simple worker example
- [Simple Async Worker Example]https://github.com/ayrat555/fang/tree/master/fang_examples/simple_async_worker - simple async worker example
- [El Monitorro]https://github.com/ayrat555/el_monitorro - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users.

### Configuration

#### Blocking feature

To configure workers, instead of `WorkerPool::new` which uses default values, use `WorkerPool.new_with_params`. It accepts two parameters - the number of workers and `WorkerParams` struct.

#### Asynk feature

Just use `TypeBuilder` done for `AsyncWorkerPool`.

### Configuring the type of workers

#### Blocking feature

You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.

Add `task_type` method to the `Runnable` trait implementation:

```rust
...

#[typetag::serde]
impl Runnable for MyTask {
    fn run(&self) -> Result<(), Error> {
        println!("the number is {}", self.number);

        Ok(())
    }

    fn task_type(&self) -> String {
        "number".to_string()
    }
}
```

Set `task_type` to the `WorkerParamas`:

```rust
let mut worker_params = WorkerParams::new();
worker_params.set_task_type("number".to_string());

WorkerPool::new_with_params(10, worker_params).start();
```

Without setting `task_type` workers will be executing any type of task.


#### Asynk feature

Same as Blocking feature.

Use `TypeBuilder` for `AsyncWorker`.

### Configuring retention mode

By default, all successfully finished tasks are removed from the DB, failed tasks aren't.

There are three retention modes you can use:

```rust
pub enum RetentionMode {
    KeepAll,        \\ doesn't remove tasks
    RemoveAll,      \\ removes all tasks
    RemoveFinished, \\ default value
}
```

Set retention mode with `set_retention_mode`:

#### Blocking feature

```rust
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);

WorkerPool::new_with_params(10, worker_params).start();
```
#### Asynk feature

Set it in `AsyncWorker` `TypeBuilder`.

### Configuring sleep values

#### Blocking feature

You can use use `SleepParams` to confugure sleep values:

```rust
pub struct SleepParams {
    pub sleep_period: u64,     \\ default value is 5
    pub max_sleep_period: u64, \\ default value is 15
    pub min_sleep_period: u64, \\ default value is 5
    pub sleep_step: u64,       \\ default value is 5
}
```

If there are no tasks in the DB, a worker sleeps for `sleep_period` and each time this value increases by `sleep_step` until it reaches `max_sleep_period`. `min_sleep_period` is the initial value for `sleep_period`. All values are in seconds.


Use `set_sleep_params` to set it:
```rust
let sleep_params = SleepParams {
    sleep_period: 2,
    max_sleep_period: 6,
    min_sleep_period: 2,
    sleep_step: 1,
};
let mut worker_params = WorkerParams::new();
worker_params.set_sleep_params(sleep_params);

WorkerPool::new_with_params(10, worker_params).start();
```
#### Asynk feature

Set it in `AsyncWorker` `TypeBuilder`.

## Periodic Tasks

Fang can add tasks to `fang_tasks` periodically. To use this feature first run [the migration with `fang_periodic_tasks` table](https://github.com/ayrat555/fang/tree/master/migrations/2021-07-24-050243_create_fang_periodic_tasks/up.sql).

Usage example:

#### Blocking feature

```rust
use fang::Scheduler;
use fang::Queue;

let queue = Queue::new();

queue
     .push_periodic_task(&SyncMyTask::default(), 120)
     .unwrap();

queue
     .push_periodic_task(&DeliverMyTask::default(), 60)
     .unwrap();

Scheduler::start(10, 5);
```

In the example above, `push_periodic_task` is used to save the specified task to the `fang_periodic_tasks` table which will be enqueued (saved to `fang_tasks` table) every specied number of seconds.

`Scheduler::start(10, 5)` starts scheduler. It accepts two parameters:
- Db check period in seconds
- Acceptable error limit in seconds - |current_time - scheduled_time| < error

#### Asynk feature
```rust
use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;

// Build a AsyncQueue as before 

let schedule_in_future = Utc::now() + OtherDuration::seconds(5);

let _periodic_task = queue.insert_periodic_task(
    &AsyncTask { number: 1 },
    schedule_in_future,
    10,
)
.await;

let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;

let mut scheduler = Scheduler::builder()
    .check_period(check_period)
    .error_margin_seconds(error_margin_seconds)
    .queue(&mut queue as &mut dyn AsyncQueueable)
    .build();

// Add some more task in other thread or before loop

// Scheduler Loop 
scheduler.start().await.unwrap();
```


## Contributing

1. [Fork it!]https://github.com/ayrat555/fang/fork
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Add some feature'`)
4. Push to the branch (`git push origin my-new-feature`)
5. Create new Pull Request

### Running tests locally

```
cargo install diesel_cli

docker run --rm -d --name postgres -p 5432:5432 \
  -e POSTGRES_DB=fang \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_PASSWORD=postgres \
  postgres:latest

DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run

// Run regular tests
cargo test --all-features

// Run dirty/long tests, DB must be recreated afterwards
cargo test --all-features -- --ignored --test-threads=1

docker kill postgres
```

## Authors

- Ayrat Badykov (@ayrat555)
 
- Pepe Márquez (@pxp9)


[s1]: https://img.shields.io/crates/v/fang.svg
[docs-badge]: https://img.shields.io/badge/docs-website-blue.svg
[ci]: https://crates.io/crates/fang
[docs]: https://docs.rs/fang/
[ga-test]: https://github.com/ayrat555/fang/actions/workflows/rust.yml/badge.svg
[ga-style]: https://github.com/ayrat555/fang/actions/workflows/style.yml/badge.svg
[signal-hook]: https://crates.io/crates/signal-hook