Crate background_jobs_actix[−][src]
An Actix-based Jobs Processor
This library will spin up as many actors as requested for each processor to process jobs concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in order to achieve parallel execution, multiple Arbiters must be in use.
The thread count is used to spawn Synchronous Actors to handle the storage of job information. For storage backends that cannot be parallelized, a thread-count of 1 should be used. By default, the number of cores of the running system is used.
Example
ⓘ
use anyhow::Error; use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig}; use futures::future::{ok, Ready}; const DEFAULT_QUEUE: &'static str = "default"; #[derive(Clone, Debug)] pub struct MyState { pub app_name: String, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct MyJob { some_usize: usize, other_usize: usize, } #[actix_rt::main] async fn main() -> Result<(), Error> { // Set up our Storage // For this example, we use the default in-memory storage mechanism use background_jobs::memory_storage::Storage; let storage = Storage::new(); // Start the application server. This guards access to to the jobs store let queue_handle = create_server(storage); // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) .register::<MyJob>() .set_worker_count(DEFAULT_QUEUE, 16) .start(queue_handle.clone()); // Queue our jobs queue_handle.queue(MyJob::new(1, 2))?; queue_handle.queue(MyJob::new(3, 4))?; queue_handle.queue(MyJob::new(5, 6))?; actix_rt::signal::ctrl_c().await?; Ok(()) } impl MyState { pub fn new(app_name: &str) -> Self { MyState { app_name: app_name.to_owned(), } } } impl MyJob { pub fn new(some_usize: usize, other_usize: usize) -> Self { MyJob { some_usize, other_usize, } } } #[async_trait::async_trait] impl Job for MyJob { type State = MyState; type Future = Ready<Result<(), Error>>; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being // registered. const NAME: &'static str = "MyJob"; // The queue that this processor belongs to // // Workers have the option to subscribe to specific queues, so this is important to // determine which worker will call the processor // // Jobs can optionally override the queue they're spawned on const QUEUE: &'static str = DEFAULT_QUEUE; // The number of times background-jobs should try to retry a job before giving up // // This value defaults to MaxRetries::Count(5) // Jobs can optionally override this value const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); // The logic to determine how often to retry this job if it fails // // This value defaults to Backoff::Exponential(2) // Jobs can optionally override this value const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); // When should the job be considered dead // // The timeout defines when a job is allowed to be considered dead, and so can be retried // by the job processor. The value is in milliseconds and defaults to 15,000 const TIMEOUT: i64 = 15_000 async fn run(self, state: MyState) -> Self::Future { println!("{}: args, {:?}", state.app_name, self); ok(()) } }
Structs
QueueHandle | A handle to the job server, used for queuing new jobs |
WorkerConfig | Worker Configuration |
Traits
ActixJob | The ActixJob trait defines parameters pertaining to an instance of background job |
Functions
create_server | Create a new Server |