An Actix-based Jobs Processor
This library will spin up as many actors as requested for each processor to process jobs
concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
order to achieve parallel execution, multiple Arbiters must be in use.
The thread count is used to spawn Synchronous Actors to handle the storage of job
information. For storage backends that cannot be parallelized, a thread-count of 1 should be
used. By default, the number of cores of the running system is used.
Example
use anyhow::Error;
use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig};
use futures::future::{ok, Ready};
const DEFAULT_QUEUE: &'static str = "default";
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_usize: usize,
other_usize: usize,
}
#[actix_rt::main]
async fn main() -> Result<(), Error> {
use background_jobs::memory_storage::Storage;
let storage = Storage::new();
let queue_handle = create_server(storage);
WorkerConfig::new(move || MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone());
queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
actix_rt::signal::ctrl_c().await?;
Ok(())
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl MyJob {
pub fn new(some_usize: usize, other_usize: usize) -> Self {
MyJob {
some_usize,
other_usize,
}
}
}
#[async_trait::async_trait]
impl Job for MyJob {
type State = MyState;
type Future = Ready<Result<(), Error>>;
const NAME: &'static str = "MyJob";
const QUEUE: &'static str = DEFAULT_QUEUE;
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2);
const TIMEOUT: i64 = 15_000
async fn run(self, state: MyState) -> Self::Future {
println!("{}: args, {:?}", state.app_name, self);
ok(())
}
}