[][src]Crate background_jobs_actix

An Actix-based Jobs Processor

This library will spin up as many actors as requested for each processor to process jobs concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in order to achieve parallel execution, multiple Arbiters must be in use.

The thread count is used to spawn Synchronous Actors to handle the storage of job information. For storage backends that cannot be parallelized, a thread-count of 1 should be used. By default, the number of cores of the running system is used.

Example

This example is not tested
use actix::System;
use background_jobs::{Backoff, Job, MaxRetries, Processor, ServerConfig, WorkerConfig};
use failure::Error;
use futures::{future::ok, Future};
use serde_derive::{Deserialize, Serialize};

const DEFAULT_QUEUE: &'static str = "default";

#[derive(Clone, Debug)]
pub struct MyState {
    pub app_name: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MyJob {
    some_usize: usize,
    other_usize: usize,
}

#[derive(Clone, Debug)]
pub struct MyProcessor;

fn main() -> Result<(), Error> {
    // First set up the Actix System to ensure we have a runtime to spawn jobs on.
    let sys = System::new("my-actix-system");

    // Set up our Storage
    // For this example, we use the default in-memory storage mechanism
    use background_jobs::memory_storage::Storage;
    let storage = Storage::new();

    // Start the application server. This guards access to to the jobs store
    let queue_handle = ServerConfig::new(storage).thread_count(8).start();

    // Configure and start our workers
    WorkerConfig::new(move || MyState::new("My App"))
        .register(MyProcessor)
        .set_processor_count(DEFAULT_QUEUE, 16)
        .start(queue_handle.clone());

    // Queue our jobs
    queue_handle.queue(MyJob::new(1, 2))?;
    queue_handle.queue(MyJob::new(3, 4))?;
    queue_handle.queue(MyJob::new(5, 6))?;

    // Block on Actix
    sys.run()?;
    Ok(())
}

impl MyState {
    pub fn new(app_name: &str) -> Self {
        MyState {
            app_name: app_name.to_owned(),
        }
    }
}

impl MyJob {
    pub fn new(some_usize: usize, other_usize: usize) -> Self {
        MyJob {
            some_usize,
            other_usize,
        }
    }
}

impl Job for MyJob {
    type Processor = MyProcessor;
    type State = MyState;
    type Future = Result<(), Error>;

    fn run(self, state: MyState) -> Self::Future {
        println!("{}: args, {:?}", state.app_name, self);

        Ok(())
    }
}

impl Processor for MyProcessor {
    // The kind of job this processor should execute
    type Job = MyJob;

    // The name of the processor. It is super important that each processor has a unique name,
    // because otherwise one processor will overwrite another processor when they're being
    // registered.
    const NAME: &'static str = "MyProcessor";

    // The queue that this processor belongs to
    //
    // Workers have the option to subscribe to specific queues, so this is important to
    // determine which worker will call the processor
    //
    // Jobs can optionally override the queue they're spawned on
    const QUEUE: &'static str = DEFAULT_QUEUE;

    // The number of times background-jobs should try to retry a job before giving up
    //
    // Jobs can optionally override this value
    const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);

    // The logic to determine how often to retry this job if it fails
    //
    // Jobs can optionally override this value
    const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
}

Structs

Every

A type used to schedule recurring jobs.

LocalWorker

A worker that runs on the same system as the jobs server

QueueHandle

A handle to the job server, used for queuing new jobs

Server

The server Actor

ServerConfig

The configuration for a jobs server

WorkerConfig

Worker Configuration