background_jobs_actix/
lib.rs

1#![deny(missing_docs)]
2
3//! # An Actix-based Jobs Processor
4//!
5//! This library will spin up as many actors as requested for each processor to process jobs
6//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
7//! order to achieve parallel execution, multiple Arbiters must be in use.
8//!
9//! The thread count is used to spawn Synchronous Actors to handle the storage of job
10//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be
11//! used. By default, the number of cores of the running system is used.
12//!
13//! ### Example
14//! ```rust
15//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError};
16//! use background_jobs_actix::{ActixTimer, WorkerConfig};
17//! use std::future::{ready, Ready};
18//!
19//! const DEFAULT_QUEUE: &'static str = "default";
20//!
21//! #[derive(Clone, Debug)]
22//! pub struct MyState {
23//!     pub app_name: String,
24//! }
25//!
26//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
27//! pub struct MyJob {
28//!     some_usize: usize,
29//!     other_usize: usize,
30//! }
31//!
32//! #[actix_rt::main]
33//! async fn main() -> Result<(), BoxError> {
34//!     // Set up our Storage
35//!     // For this example, we use the default in-memory storage mechanism
36//!     use background_jobs_core::memory_storage::Storage;
37//!     let storage = Storage::new(ActixTimer);
38//!
39//!     // Configure and start our workers
40//!     let job_queue = WorkerConfig::new(storage, move |_| MyState::new("My App"))
41//!         .register::<MyJob>()
42//!         .set_worker_count(DEFAULT_QUEUE, 16)
43//!         .start();
44//!
45//!     // Queue our jobs
46//!     job_queue.push(MyJob::new(1, 2)).await?;
47//!     job_queue.push(MyJob::new(3, 4)).await?;
48//!     job_queue.push(MyJob::new(5, 6)).await?;
49//!
50//!     // actix_rt::signal::ctrl_c().await?;
51//!
52//!     Ok(())
53//! }
54//!
55//! impl MyState {
56//!     pub fn new(app_name: &str) -> Self {
57//!         MyState {
58//!             app_name: app_name.to_owned(),
59//!         }
60//!     }
61//! }
62//!
63//! impl MyJob {
64//!     pub fn new(some_usize: usize, other_usize: usize) -> Self {
65//!         MyJob {
66//!             some_usize,
67//!             other_usize,
68//!         }
69//!     }
70//! }
71//!
72//! impl Job for MyJob {
73//!     type State = MyState;
74//!     type Error = BoxError;
75//!
76//!     // The name of the job. It is super important that each job has a unique name,
77//!     // because otherwise one job will overwrite another job when they're being
78//!     // registered.
79//!     const NAME: &'static str = "MyJob";
80//!
81//!     // The queue that this processor belongs to
82//!     //
83//!     // Workers have the option to subscribe to specific queues, so this is important to
84//!     // determine which worker will call the processor
85//!     //
86//!     // Jobs can optionally override the queue they're spawned on
87//!     const QUEUE: &'static str = DEFAULT_QUEUE;
88//!
89//!     // The number of times background-jobs should try to retry a job before giving up
90//!     //
91//!     // This value defaults to MaxRetries::Count(5)
92//!     // Jobs can optionally override this value
93//!     const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
94//!
95//!     // The logic to determine how often to retry this job if it fails
96//!     //
97//!     // This value defaults to Backoff::Exponential(2)
98//!     // Jobs can optionally override this value
99//!     const BACKOFF: Backoff = Backoff::Exponential(2);
100//!
101//!     // This is important for allowing the job storage to reap processes that were started but never
102//!     // completed.
103//!     //
104//!     // Defaults to 5 seconds
105//!     const HEARTBEAT_INTERVAL: u64 = 5_000;
106//!
107//!     async fn run(self, state: MyState) -> Result<(), Self::Error> {
108//!         println!("{}: args, {:?}", state.app_name, self);
109//!
110//!         Ok(())
111//!     }
112//! }
113//! ```
114
115use actix_rt::{Arbiter, ArbiterHandle};
116use background_jobs_core::{
117    memory_storage::Timer, new_job, new_scheduled_job, BoxError, DynStorage, Job, ProcessorMap,
118    Storage,
119};
120use std::{
121    collections::BTreeMap,
122    marker::PhantomData,
123    num::NonZeroUsize,
124    ops::Deref,
125    sync::Arc,
126    time::{Duration, SystemTime},
127};
128use tokio::sync::Notify;
129
130mod actix_job;
131mod every;
132mod spawn;
133mod worker;
134
135use self::every::every;
136
137type JobStorage = Arc<dyn DynStorage<BoxError> + Sync>;
138
139pub use actix_job::ActixSpawner;
140
141/// A timer implementation for the Memory Storage backend
142#[derive(Debug, Clone)]
143pub struct ActixTimer;
144
145impl Timer for ActixTimer {
146    async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
147    where
148        F: std::future::Future + Send + Sync,
149    {
150        tokio::time::timeout(duration, future).await.map_err(|_| ())
151    }
152}
153
154/// Manager for worker threads
155///
156/// Manager attempts to restart workers as their arbiters die. Dropping the manager kills the
157/// workers
158pub struct Manager {
159    // the manager arbiter
160    arbiter: Option<Arbiter>,
161
162    // handle for queueing
163    job_queue: JobQueue,
164}
165
166impl Manager {
167    /// Create a new manager to keep jobs alive
168    ///
169    /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it
170    /// spins up another one and spawns the workers again
171    fn new<State>(worker_config: WorkerConfig<State, Managed>, thread_count: NonZeroUsize) -> Self
172    where
173        State: Clone,
174    {
175        let manager_arbiter = Arbiter::new();
176        let job_queue = worker_config.job_queue.clone();
177
178        for i in 0..thread_count.into() {
179            let worker_config = worker_config.clone();
180
181            manager_arbiter.spawn(async move {
182                let mut worker_arbiter = ArbiterDropper::new();
183
184                loop {
185                    let notifier = DropNotifier::default();
186                    worker_config.start_managed(&worker_arbiter.handle(), &());
187
188                    let notified = notifier.notify.notified();
189
190                    let drop_notifier = notifier.clone();
191                    worker_arbiter.spawn(async move {
192                        std::future::pending::<()>().await;
193                        drop(drop_notifier);
194                    });
195
196                    notified.await;
197
198                    metrics::counter!("background-jobs.actix.worker-arbiter.restart", "number" => i.to_string()).increment(1);
199                    tracing::warn!("Recovering from dead worker arbiter");
200
201                    drop(worker_arbiter);
202
203                    worker_arbiter = ArbiterDropper::new();
204                }
205            });
206        }
207
208        Manager {
209            arbiter: Some(manager_arbiter),
210            job_queue,
211        }
212    }
213
214    /// Retrieve the JobQueue for the managed workers
215    pub fn job_queue(&self) -> &JobQueue {
216        &self.job_queue
217    }
218}
219
220impl Deref for Manager {
221    type Target = JobQueue;
222
223    fn deref(&self) -> &Self::Target {
224        &self.job_queue
225    }
226}
227
228impl Drop for Manager {
229    fn drop(&mut self) {
230        tracing::warn!("Dropping manager, tearing down workers");
231        if let Some(arbiter) = self.arbiter.take() {
232            arbiter.stop();
233            let _ = arbiter.join();
234        }
235    }
236}
237
238#[derive(Clone, Default)]
239struct DropNotifier {
240    notify: Arc<Notify>,
241}
242
243impl Drop for DropNotifier {
244    fn drop(&mut self) {
245        tracing::warn!("DropNotifier dropped - Arbiter tearing down");
246        self.notify.notify_waiters();
247    }
248}
249
250struct ArbiterDropper {
251    arbiter: Option<Arbiter>,
252}
253
254impl ArbiterDropper {
255    fn new() -> Self {
256        Self {
257            arbiter: Some(Arbiter::new()),
258        }
259    }
260}
261
262impl Deref for ArbiterDropper {
263    type Target = Arbiter;
264
265    fn deref(&self) -> &Self::Target {
266        self.arbiter.as_ref().unwrap()
267    }
268}
269
270impl Drop for ArbiterDropper {
271    fn drop(&mut self) {
272        tracing::warn!("Stopping and joining arbiter");
273        let arbiter = self.arbiter.take().unwrap();
274        arbiter.stop();
275        let _ = arbiter.join();
276        tracing::warn!("Joined");
277    }
278}
279
280/// Create a new managed Server
281///
282/// In previous versions of this library, the server itself was run on it's own dedicated threads
283/// and guarded access to jobs via messages. Since we now have futures-aware synchronization
284/// primitives, the Server has become an object that gets shared between client threads.
285fn create_server_managed<S>(storage: S) -> JobQueue
286where
287    S: Storage + Sync + 'static,
288{
289    JobQueue {
290        storage: Arc::new(storage),
291    }
292}
293
294/// Marker type for Unmanaged workers
295#[derive(Clone)]
296pub struct Unmanaged;
297/// Marker type for Managed workers
298#[derive(Clone)]
299pub struct Managed;
300
301/// Worker Configuration
302///
303/// This type is used for configuring and creating workers to process jobs. Before starting the
304/// workers, register `Job` types with this struct. This worker registration allows for
305/// different worker processes to handle different sets of workers.
306#[derive(Clone)]
307pub struct WorkerConfig<State, M>
308where
309    State: Clone + 'static,
310{
311    processors: ProcessorMap<State>,
312    queues: BTreeMap<String, u64>,
313    arbiter: Option<ArbiterHandle>,
314    job_queue: JobQueue,
315    managed: PhantomData<M>,
316}
317
318impl<State> WorkerConfig<State, Managed>
319where
320    State: Clone + 'static,
321{
322    /// Create a new managed WorkerConfig
323    ///
324    /// The supplied function should return the State required by the jobs intended to be
325    /// processed. The function must be sharable between threads, but the state itself does not
326    /// have this requirement.
327    pub fn new_managed<S: Storage + Send + Sync + 'static>(
328        storage: S,
329        state_fn: impl Fn(JobQueue) -> State + Send + Sync + 'static,
330    ) -> Self {
331        let job_queue = create_server_managed(storage);
332        let q2 = job_queue.clone();
333
334        WorkerConfig {
335            processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
336            queues: BTreeMap::new(),
337            arbiter: None,
338            job_queue,
339            managed: PhantomData,
340        }
341    }
342
343    /// Start the workers on a managed thread, returning the manager struct
344    pub fn start(self) -> Manager {
345        Self::start_with_threads(self, NonZeroUsize::try_from(1).expect("nonzero"))
346    }
347
348    /// Start the workers on the specified number of managed threads, returning the Manager struct
349    pub fn start_with_threads(self, thread_count: NonZeroUsize) -> Manager {
350        Manager::new(self, thread_count)
351    }
352}
353
354impl<State> WorkerConfig<State, Unmanaged>
355where
356    State: Clone + 'static,
357{
358    /// Create a new WorkerConfig in the current arbiter
359    ///
360    /// The supplied function should return the State required by the jobs intended to be
361    /// processed. The function must be sharable between threads, but the state itself does not
362    /// have this requirement.
363    pub fn new<S: Storage + Send + Sync + 'static>(
364        storage: S,
365        state_fn: impl Fn(JobQueue) -> State + Send + Sync + 'static,
366    ) -> Self {
367        Self::new_in_arbiter(Arbiter::current(), storage, state_fn)
368    }
369
370    /// Create a new WorkerConfig in the provided arbiter
371    ///
372    /// The supplied function should return the State required by the jobs intended to be
373    /// processed. The function must be sharable between threads, but the state itself does not
374    /// have this requirement.
375    pub fn new_in_arbiter<S: Storage + Send + Sync + 'static>(
376        arbiter: ArbiterHandle,
377        storage: S,
378        state_fn: impl Fn(JobQueue) -> State + Send + Sync + 'static,
379    ) -> Self {
380        let job_queue = create_server_managed(storage);
381        let q2 = job_queue.clone();
382
383        WorkerConfig {
384            processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
385            queues: BTreeMap::new(),
386            arbiter: Some(arbiter),
387            job_queue,
388            managed: PhantomData,
389        }
390    }
391
392    /// Start the workers in the provided arbiter
393    pub fn start(self) -> JobQueue {
394        self.start_managed(self.arbiter.as_ref().unwrap(), &());
395
396        self.job_queue
397    }
398}
399
400impl<State, M> WorkerConfig<State, M>
401where
402    State: Clone + 'static,
403{
404    /// Register a `Job` with the worker
405    ///
406    /// This enables the worker to handle jobs associated with this processor. If a processor is
407    /// not registered, none of it's jobs will be run, even if another processor handling the same
408    /// job queue is registered.
409    pub fn register<J>(mut self) -> Self
410    where
411        J: Job<State = State>,
412    {
413        self.queues.insert(J::QUEUE.to_owned(), 4);
414        self.processors.register::<J>();
415        self
416    }
417
418    /// Set the number of workers to run for a given queue
419    ///
420    /// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto
421    /// will handle processing all workers, regardless of how many are configured.
422    ///
423    /// By default, 4 workers are spawned
424    pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self {
425        self.queues.insert(queue.to_owned(), count);
426        self
427    }
428
429    /// Start a workers in a managed way
430    fn start_managed<Extras: Clone + Send + 'static>(
431        &self,
432        arbiter: &ArbiterHandle,
433        extras: &Extras,
434    ) {
435        for (key, count) in self.queues.iter() {
436            for _ in 0..*count {
437                let queue = key.clone();
438                let processors = self.processors.clone();
439                let storage = self.job_queue.storage.clone();
440
441                let extras_2 = extras.clone();
442
443                arbiter.spawn_fn(move || {
444                    if let Err(e) = spawn::spawn(
445                        "local-worker",
446                        worker::local_worker(queue, processors.cached(), storage, extras_2),
447                    ) {
448                        tracing::error!("Failed to spawn worker {e}");
449                    }
450                });
451            }
452        }
453    }
454}
455
456#[deprecated(since = "0.20.0")]
457/// Deprecated alias for [`JobQueue`]
458pub type QueueHandle = JobQueue;
459
460/// A handle to the job storage, used for queuing new jobs
461///
462/// `JobQueue` should be stored in your application's state in order to allow all parts of your
463/// application to spawn jobs.
464#[derive(Clone)]
465pub struct JobQueue {
466    storage: JobStorage,
467}
468
469impl JobQueue {
470    #[deprecated(since = "0.20.0")]
471    /// Queues a job for execution
472    ///
473    /// Deprecated in favor of [`JobQueue::push`]
474    pub async fn queue<J>(&self, job: J) -> Result<(), BoxError>
475    where
476        J: Job,
477    {
478        self.push(job).await
479    }
480
481    /// Pushes a job into the job queue
482    ///
483    /// This job will be added to the storage, and will execute whenever a worker for the
484    /// job's queue is free to do so.
485    pub async fn push<J>(&self, job: J) -> Result<(), BoxError>
486    where
487        J: Job,
488    {
489        let job = new_job(job)?;
490        self.storage.push(job).await?;
491        Ok(())
492    }
493
494    /// Schedule a job for execution later
495    ///
496    /// This job will be added to the storage, and will execute after the specified time
497    /// and when a worker for the job's queue is free to do so.
498    pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), BoxError>
499    where
500        J: Job,
501    {
502        let job = new_scheduled_job(job, after)?;
503        self.storage.push(job).await?;
504        Ok(())
505    }
506
507    /// Queues a job for recurring execution
508    ///
509    /// This job will be added to its queue in the storage once every `Duration`. It will be
510    /// processed whenever workers are free to do so.
511    pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
512    where
513        J: Job + Clone + Send + 'static,
514    {
515        spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
516    }
517}