Expand description

§An Actix-based Jobs Processor

This library will spin up as many actors as requested for each processor to process jobs concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in order to achieve parallel execution, multiple Arbiters must be in use.

The thread count is used to spawn Synchronous Actors to handle the storage of job information. For storage backends that cannot be parallelized, a thread-count of 1 should be used. By default, the number of cores of the running system is used.

§Example

use background_jobs_core::{Backoff, Job, MaxRetries, BoxError};
use background_jobs_actix::{ActixTimer, WorkerConfig};
use std::future::{ready, Ready};

const DEFAULT_QUEUE: &'static str = "default";

#[derive(Clone, Debug)]
pub struct MyState {
    pub app_name: String,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
    some_usize: usize,
    other_usize: usize,
}

#[actix_rt::main]
async fn main() -> Result<(), BoxError> {
    // Set up our Storage
    // For this example, we use the default in-memory storage mechanism
    use background_jobs_core::memory_storage::Storage;
    let storage = Storage::new(ActixTimer);

    // Configure and start our workers
    let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
        .register::<MyJob>()
        .set_worker_count(DEFAULT_QUEUE, 16)
        .start();

    // Queue our jobs
    queue_handle.queue(MyJob::new(1, 2)).await?;
    queue_handle.queue(MyJob::new(3, 4)).await?;
    queue_handle.queue(MyJob::new(5, 6)).await?;

    // actix_rt::signal::ctrl_c().await?;

    Ok(())
}

impl MyState {
    pub fn new(app_name: &str) -> Self {
        MyState {
            app_name: app_name.to_owned(),
        }
    }
}

impl MyJob {
    pub fn new(some_usize: usize, other_usize: usize) -> Self {
        MyJob {
            some_usize,
            other_usize,
        }
    }
}

impl Job for MyJob {
    type State = MyState;
    type Future = Ready<Result<(), BoxError>>;

    // The name of the job. It is super important that each job has a unique name,
    // because otherwise one job will overwrite another job when they're being
    // registered.
    const NAME: &'static str = "MyJob";

    // The queue that this processor belongs to
    //
    // Workers have the option to subscribe to specific queues, so this is important to
    // determine which worker will call the processor
    //
    // Jobs can optionally override the queue they're spawned on
    const QUEUE: &'static str = DEFAULT_QUEUE;

    // The number of times background-jobs should try to retry a job before giving up
    //
    // This value defaults to MaxRetries::Count(5)
    // Jobs can optionally override this value
    const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);

    // The logic to determine how often to retry this job if it fails
    //
    // This value defaults to Backoff::Exponential(2)
    // Jobs can optionally override this value
    const BACKOFF: Backoff = Backoff::Exponential(2);

    // This is important for allowing the job server to reap processes that were started but never
    // completed.
    //
    // Defaults to 5 seconds
    const HEARTBEAT_INTERVAL: u64 = 5_000;

    fn run(self, state: MyState) -> Self::Future {
        println!("{}: args, {:?}", state.app_name, self);

        ready(Ok(()))
    }
}

Structs§

  • Provide a spawner for actix-based systems for Unsend Jobs
  • A timer implementation for the Memory Storage backend
  • Marker type for Managed workers
  • Manager for worker threads
  • A handle to the job server, used for queuing new jobs
  • Marker type for Unmanaged workers
  • Worker Configuration