Crate background_jobs_actix[][src]

An Actix-based Jobs Processor

This library will spin up as many actors as requested for each processor to process jobs concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in order to achieve parallel execution, multiple Arbiters must be in use.

The thread count is used to spawn Synchronous Actors to handle the storage of job information. For storage backends that cannot be parallelized, a thread-count of 1 should be used. By default, the number of cores of the running system is used.

Example

use anyhow::Error;
use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig};
use futures::future::{ok, Ready};

const DEFAULT_QUEUE: &'static str = "default";

#[derive(Clone, Debug)]
pub struct MyState {
    pub app_name: String,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
    some_usize: usize,
    other_usize: usize,
}

#[actix_rt::main]
async fn main() -> Result<(), Error> {
    // Set up our Storage
    // For this example, we use the default in-memory storage mechanism
    use background_jobs::memory_storage::Storage;
    let storage = Storage::new();

    // Start the application server. This guards access to to the jobs store
    let queue_handle = create_server(storage);

    // Configure and start our workers
    WorkerConfig::new(move || MyState::new("My App"))
        .register::<MyJob>()
        .set_worker_count(DEFAULT_QUEUE, 16)
        .start(queue_handle.clone());

    // Queue our jobs
    queue_handle.queue(MyJob::new(1, 2))?;
    queue_handle.queue(MyJob::new(3, 4))?;
    queue_handle.queue(MyJob::new(5, 6))?;

    actix_rt::signal::ctrl_c().await?;

    Ok(())
}

impl MyState {
    pub fn new(app_name: &str) -> Self {
        MyState {
            app_name: app_name.to_owned(),
        }
    }
}

impl MyJob {
    pub fn new(some_usize: usize, other_usize: usize) -> Self {
        MyJob {
            some_usize,
            other_usize,
        }
    }
}

#[async_trait::async_trait]
impl Job for MyJob {
    type State = MyState;
    type Future = Ready<Result<(), Error>>;

    // The name of the job. It is super important that each job has a unique name,
    // because otherwise one job will overwrite another job when they're being
    // registered.
    const NAME: &'static str = "MyJob";

    // The queue that this processor belongs to
    //
    // Workers have the option to subscribe to specific queues, so this is important to
    // determine which worker will call the processor
    //
    // Jobs can optionally override the queue they're spawned on
    const QUEUE: &'static str = DEFAULT_QUEUE;

    // The number of times background-jobs should try to retry a job before giving up
    //
    // This value defaults to MaxRetries::Count(5)
    // Jobs can optionally override this value
    const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);

    // The logic to determine how often to retry this job if it fails
    //
    // This value defaults to Backoff::Exponential(2)
    // Jobs can optionally override this value
    const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);

    // When should the job be considered dead
    //
    // The timeout defines when a job is allowed to be considered dead, and so can be retried
    // by the job processor. The value is in milliseconds and defaults to 15,000
    const TIMEOUT: i64 = 15_000

    async fn run(self, state: MyState) -> Self::Future {
        println!("{}: args, {:?}", state.app_name, self);

        ok(())
    }
}

Structs

QueueHandle

A handle to the job server, used for queuing new jobs

WorkerConfig

Worker Configuration

Traits

ActixJob

The ActixJob trait defines parameters pertaining to an instance of background job

Functions

create_server

Create a new Server