background_jobs_actix/lib.rs
1#![deny(missing_docs)]
2
3//! # An Actix-based Jobs Processor
4//!
5//! This library will spin up as many actors as requested for each processor to process jobs
6//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
7//! order to achieve parallel execution, multiple Arbiters must be in use.
8//!
9//! The thread count is used to spawn Synchronous Actors to handle the storage of job
10//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be
11//! used. By default, the number of cores of the running system is used.
12//!
13//! ### Example
14//! ```rust
15//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError};
16//! use background_jobs_actix::{ActixTimer, WorkerConfig};
17//! use std::future::{ready, Ready};
18//!
19//! const DEFAULT_QUEUE: &'static str = "default";
20//!
21//! #[derive(Clone, Debug)]
22//! pub struct MyState {
23//! pub app_name: String,
24//! }
25//!
26//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
27//! pub struct MyJob {
28//! some_usize: usize,
29//! other_usize: usize,
30//! }
31//!
32//! #[actix_rt::main]
33//! async fn main() -> Result<(), BoxError> {
34//! // Set up our Storage
35//! // For this example, we use the default in-memory storage mechanism
36//! use background_jobs_core::memory_storage::Storage;
37//! let storage = Storage::new(ActixTimer);
38//!
39//! // Configure and start our workers
40//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
41//! .register::<MyJob>()
42//! .set_worker_count(DEFAULT_QUEUE, 16)
43//! .start();
44//!
45//! // Queue our jobs
46//! queue_handle.queue(MyJob::new(1, 2)).await?;
47//! queue_handle.queue(MyJob::new(3, 4)).await?;
48//! queue_handle.queue(MyJob::new(5, 6)).await?;
49//!
50//! // actix_rt::signal::ctrl_c().await?;
51//!
52//! Ok(())
53//! }
54//!
55//! impl MyState {
56//! pub fn new(app_name: &str) -> Self {
57//! MyState {
58//! app_name: app_name.to_owned(),
59//! }
60//! }
61//! }
62//!
63//! impl MyJob {
64//! pub fn new(some_usize: usize, other_usize: usize) -> Self {
65//! MyJob {
66//! some_usize,
67//! other_usize,
68//! }
69//! }
70//! }
71//!
72//! impl Job for MyJob {
73//! type State = MyState;
74//! type Future = Ready<Result<(), BoxError>>;
75//!
76//! // The name of the job. It is super important that each job has a unique name,
77//! // because otherwise one job will overwrite another job when they're being
78//! // registered.
79//! const NAME: &'static str = "MyJob";
80//!
81//! // The queue that this processor belongs to
82//! //
83//! // Workers have the option to subscribe to specific queues, so this is important to
84//! // determine which worker will call the processor
85//! //
86//! // Jobs can optionally override the queue they're spawned on
87//! const QUEUE: &'static str = DEFAULT_QUEUE;
88//!
89//! // The number of times background-jobs should try to retry a job before giving up
90//! //
91//! // This value defaults to MaxRetries::Count(5)
92//! // Jobs can optionally override this value
93//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
94//!
95//! // The logic to determine how often to retry this job if it fails
96//! //
97//! // This value defaults to Backoff::Exponential(2)
98//! // Jobs can optionally override this value
99//! const BACKOFF: Backoff = Backoff::Exponential(2);
100//!
101//! // This is important for allowing the job server to reap processes that were started but never
102//! // completed.
103//! //
104//! // Defaults to 5 seconds
105//! const HEARTBEAT_INTERVAL: u64 = 5_000;
106//!
107//! fn run(self, state: MyState) -> Self::Future {
108//! println!("{}: args, {:?}", state.app_name, self);
109//!
110//! ready(Ok(()))
111//! }
112//! }
113//! ```
114
115use actix_rt::{Arbiter, ArbiterHandle};
116use background_jobs_core::{
117 memory_storage::Timer, new_job, new_scheduled_job, BoxError, Job, ProcessorMap, Storage,
118};
119use std::{
120 collections::BTreeMap,
121 marker::PhantomData,
122 num::NonZeroUsize,
123 ops::Deref,
124 sync::Arc,
125 time::{Duration, SystemTime},
126};
127use tokio::sync::Notify;
128
129mod actix_job;
130mod every;
131mod server;
132mod spawn;
133mod storage;
134mod worker;
135
136use self::{every::every, server::Server};
137
138pub use actix_job::ActixSpawner;
139
140/// A timer implementation for the Memory Storage backend
141#[derive(Debug, Clone)]
142pub struct ActixTimer;
143
144#[async_trait::async_trait]
145impl Timer for ActixTimer {
146 async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
147 where
148 F: std::future::Future + Send + Sync,
149 {
150 tokio::time::timeout(duration, future).await.map_err(|_| ())
151 }
152}
153
154/// Manager for worker threads
155///
156/// Manager attempts to restart workers as their arbiters die. Dropping the manager kills the
157/// workers
158pub struct Manager {
159 // the manager arbiter
160 arbiter: Option<Arbiter>,
161
162 // handle for queueing
163 queue_handle: QueueHandle,
164}
165
166impl Manager {
167 /// Create a new manager to keep jobs alive
168 ///
169 /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it
170 /// spins up another one and spawns the workers again
171 fn new<State>(worker_config: WorkerConfig<State, Managed>, thread_count: NonZeroUsize) -> Self
172 where
173 State: Clone,
174 {
175 let manager_arbiter = Arbiter::new();
176 let queue_handle = worker_config.queue_handle.clone();
177
178 for i in 0..thread_count.into() {
179 let worker_config = worker_config.clone();
180
181 manager_arbiter.spawn(async move {
182 let mut worker_arbiter = ArbiterDropper::new();
183
184 loop {
185 let notifier = DropNotifier::default();
186 worker_config.start_managed(&worker_arbiter.handle(), &());
187
188 let notified = notifier.notify.notified();
189
190 let drop_notifier = notifier.clone();
191 worker_arbiter.spawn(async move {
192 std::future::pending::<()>().await;
193 drop(drop_notifier);
194 });
195
196 notified.await;
197
198 metrics::counter!("background-jobs.actix.worker-arbiter.restart", "number" => i.to_string()).increment(1);
199 tracing::warn!("Recovering from dead worker arbiter");
200
201 drop(worker_arbiter);
202
203 worker_arbiter = ArbiterDropper::new();
204 }
205 });
206 }
207
208 Manager {
209 arbiter: Some(manager_arbiter),
210 queue_handle,
211 }
212 }
213
214 /// Retrieve the QueueHandle for the managed workers
215 pub fn queue_handle(&self) -> &QueueHandle {
216 &self.queue_handle
217 }
218}
219
220impl Deref for Manager {
221 type Target = QueueHandle;
222
223 fn deref(&self) -> &Self::Target {
224 &self.queue_handle
225 }
226}
227
228impl Drop for Manager {
229 fn drop(&mut self) {
230 tracing::warn!("Dropping manager, tearing down workers");
231 if let Some(arbiter) = self.arbiter.take() {
232 arbiter.stop();
233 let _ = arbiter.join();
234 }
235 }
236}
237
238#[derive(Clone, Default)]
239struct DropNotifier {
240 notify: Arc<Notify>,
241}
242
243impl Drop for DropNotifier {
244 fn drop(&mut self) {
245 tracing::warn!("DropNotifier dropped - Arbiter tearing down");
246 self.notify.notify_waiters();
247 }
248}
249
250struct ArbiterDropper {
251 arbiter: Option<Arbiter>,
252}
253
254impl ArbiterDropper {
255 fn new() -> Self {
256 Self {
257 arbiter: Some(Arbiter::new()),
258 }
259 }
260}
261
262impl Deref for ArbiterDropper {
263 type Target = Arbiter;
264
265 fn deref(&self) -> &Self::Target {
266 self.arbiter.as_ref().unwrap()
267 }
268}
269
270impl Drop for ArbiterDropper {
271 fn drop(&mut self) {
272 tracing::warn!("Stopping and joining arbiter");
273 let arbiter = self.arbiter.take().unwrap();
274 arbiter.stop();
275 let _ = arbiter.join();
276 tracing::warn!("Joined");
277 }
278}
279
280/// Create a new managed Server
281///
282/// In previous versions of this library, the server itself was run on it's own dedicated threads
283/// and guarded access to jobs via messages. Since we now have futures-aware synchronization
284/// primitives, the Server has become an object that gets shared between client threads.
285fn create_server_managed<S>(storage: S) -> QueueHandle
286where
287 S: Storage + Sync + 'static,
288{
289 QueueHandle {
290 inner: Server::new(storage),
291 }
292}
293
294/// Marker type for Unmanaged workers
295#[derive(Clone)]
296pub struct Unmanaged;
297/// Marker type for Managed workers
298#[derive(Clone)]
299pub struct Managed;
300
301/// Worker Configuration
302///
303/// This type is used for configuring and creating workers to process jobs. Before starting the
304/// workers, register `Job` types with this struct. This worker registration allows for
305/// different worker processes to handle different sets of workers.
306#[derive(Clone)]
307pub struct WorkerConfig<State, M>
308where
309 State: Clone + 'static,
310{
311 processors: ProcessorMap<State>,
312 queues: BTreeMap<String, u64>,
313 arbiter: Option<ArbiterHandle>,
314 queue_handle: QueueHandle,
315 managed: PhantomData<M>,
316}
317
318impl<State> WorkerConfig<State, Managed>
319where
320 State: Clone + 'static,
321{
322 /// Create a new managed WorkerConfig
323 ///
324 /// The supplied function should return the State required by the jobs intended to be
325 /// processed. The function must be sharable between threads, but the state itself does not
326 /// have this requirement.
327 pub fn new_managed<S: Storage + Send + Sync + 'static>(
328 storage: S,
329 state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
330 ) -> Self {
331 let queue_handle = create_server_managed(storage);
332 let q2 = queue_handle.clone();
333
334 WorkerConfig {
335 processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
336 queues: BTreeMap::new(),
337 arbiter: None,
338 queue_handle,
339 managed: PhantomData,
340 }
341 }
342
343 /// Start the workers on a managed thread, returning the manager struct
344 pub fn start(self) -> Manager {
345 Self::start_with_threads(self, NonZeroUsize::try_from(1).expect("nonzero"))
346 }
347
348 /// Start the workers on the specified number of managed threads, returning the Manager struct
349 pub fn start_with_threads(self, thread_count: NonZeroUsize) -> Manager {
350 Manager::new(self, thread_count)
351 }
352}
353
354impl<State> WorkerConfig<State, Unmanaged>
355where
356 State: Clone + 'static,
357{
358 /// Create a new WorkerConfig in the current arbiter
359 ///
360 /// The supplied function should return the State required by the jobs intended to be
361 /// processed. The function must be sharable between threads, but the state itself does not
362 /// have this requirement.
363 pub fn new<S: Storage + Send + Sync + 'static>(
364 storage: S,
365 state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
366 ) -> Self {
367 Self::new_in_arbiter(Arbiter::current(), storage, state_fn)
368 }
369
370 /// Create a new WorkerConfig in the provided arbiter
371 ///
372 /// The supplied function should return the State required by the jobs intended to be
373 /// processed. The function must be sharable between threads, but the state itself does not
374 /// have this requirement.
375 pub fn new_in_arbiter<S: Storage + Send + Sync + 'static>(
376 arbiter: ArbiterHandle,
377 storage: S,
378 state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
379 ) -> Self {
380 let queue_handle = create_server_managed(storage);
381 let q2 = queue_handle.clone();
382
383 WorkerConfig {
384 processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
385 queues: BTreeMap::new(),
386 arbiter: Some(arbiter),
387 queue_handle,
388 managed: PhantomData,
389 }
390 }
391
392 /// Start the workers in the provided arbiter
393 pub fn start(self) -> QueueHandle {
394 self.start_managed(self.arbiter.as_ref().unwrap(), &());
395
396 self.queue_handle
397 }
398}
399
400impl<State, M> WorkerConfig<State, M>
401where
402 State: Clone + 'static,
403{
404 /// Register a `Job` with the worker
405 ///
406 /// This enables the worker to handle jobs associated with this processor. If a processor is
407 /// not registered, none of it's jobs will be run, even if another processor handling the same
408 /// job queue is registered.
409 pub fn register<J>(mut self) -> Self
410 where
411 J: Job<State = State>,
412 {
413 self.queues.insert(J::QUEUE.to_owned(), 4);
414 self.processors.register::<J>();
415 self
416 }
417
418 /// Set the number of workers to run for a given queue
419 ///
420 /// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto
421 /// will handle processing all workers, regardless of how many are configured.
422 ///
423 /// By default, 4 workers are spawned
424 pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self {
425 self.queues.insert(queue.to_owned(), count);
426 self
427 }
428
429 /// Start a workers in a managed way
430 fn start_managed<Extras: Clone + Send + 'static>(
431 &self,
432 arbiter: &ArbiterHandle,
433 extras: &Extras,
434 ) {
435 for (key, count) in self.queues.iter() {
436 for _ in 0..*count {
437 let queue = key.clone();
438 let processors = self.processors.clone();
439 let server = self.queue_handle.inner.clone();
440
441 let extras_2 = extras.clone();
442
443 arbiter.spawn_fn(move || {
444 if let Err(e) = spawn::spawn(
445 "local-worker",
446 worker::local_worker(queue, processors.cached(), server, extras_2),
447 ) {
448 tracing::error!("Failed to spawn worker {e}");
449 }
450 });
451 }
452 }
453 }
454}
455
456/// A handle to the job server, used for queuing new jobs
457///
458/// `QueueHandle` should be stored in your application's state in order to allow all parts of your
459/// application to spawn jobs.
460#[derive(Clone)]
461pub struct QueueHandle {
462 inner: Server,
463}
464
465impl QueueHandle {
466 /// Queues a job for execution
467 ///
468 /// This job will be sent to the server for storage, and will execute whenever a worker for the
469 /// job's queue is free to do so.
470 pub async fn queue<J>(&self, job: J) -> Result<(), BoxError>
471 where
472 J: Job,
473 {
474 let job = new_job(job)?;
475 self.inner.push(job).await?;
476 Ok(())
477 }
478
479 /// Schedule a job for execution later
480 ///
481 /// This job will be sent to the server for storage, and will execute after the specified time
482 /// and when a worker for the job's queue is free to do so.
483 pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), BoxError>
484 where
485 J: Job,
486 {
487 let job = new_scheduled_job(job, after)?;
488 self.inner.push(job).await?;
489 Ok(())
490 }
491
492 /// Queues a job for recurring execution
493 ///
494 /// This job will be added to it's queue on the server once every `Duration`. It will be
495 /// processed whenever workers are free to do so.
496 pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
497 where
498 J: Job + Clone + Send + 'static,
499 {
500 spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
501 }
502}