Skip to main content

job/
lib.rs

1//! `job` is an async, Postgres-backed job scheduler and runner for Rust
2//! applications. It coordinates distributed workers, tracks job history, and
3//! handles retries with predictable backoff. Inspired by earlier systems like
4//! [Sidekiq](https://sidekiq.org), it focuses on running your
5//! application code asynchronously, outside of request/response paths while keeping business
6//! logic in familiar Rust async functions. The crate uses [`sqlx`] for
7//! database access and forbids `unsafe`.
8//!
9//! ## Documentation
10//! - [Github repository](https://github.com/GaloyMoney/job)
11//! - [Cargo package](https://crates.io/crates/job)
12//!
13//! ## Highlights
14//! - Durable Postgres-backed storage so jobs survive restarts and crashes.
15//! - Automatic exponential backoff with jitter, plus opt-in infinite retries.
16//! - Concurrency controls that let many worker instances share the workload,
17//!   configurable through [`JobPollerConfig`].
18//! - Optional at-most-one-per-type queueing via [`JobSpawner::spawn_unique`].
19//! - Built-in migrations that you can run automatically or embed into your own
20//!   migration workflow.
21//!
22//! ## Core Concepts
23//! - **Jobs service** – [`Jobs`] owns registration, polling, and shutdown.
24//! - **Initializer** – [`JobInitializer`] registers a job type and builds a
25//!   [`JobRunner`] for each execution. Defines the associated `Config` type.
26//! - **Spawner** – [`JobSpawner`] is returned from registration and provides
27//!   type-safe job creation methods. Parameterized by the config type.
28//! - **Runner** – [`JobRunner`] performs the work using the provided
29//!   [`CurrentJob`] context.
30//! - **Current job** – [`CurrentJob`] exposes attempt counts, execution state,
31//!   and access to the Postgres pool during a run.
32//! - **Completion** – [`JobCompletion`] returns the outcome: finish, retry, or
33//!   reschedule at a later time.
34//!
35//! ## Lifecycle
36//!
37//! 1. Initialize the service with [`Jobs::init`]
38//! 2. Register initializers with [`Jobs::add_initializer`] – returns a [`JobSpawner`]
39//! 3. Start polling with [`Jobs::start_poll`]
40//! 4. Use spawners to create jobs throughout your application
41//! 5. Shut down gracefully with [`Jobs::shutdown`]
42//!
43//! ## Example
44//!
45//! ```ignore
46//! use async_trait::async_trait;
47//! use job::{
48//!     CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner,
49//!     JobSpawner, JobSvcConfig, JobType, Jobs,
50//! };
51//! use serde::{Deserialize, Serialize};
52//!
53//! // 1. Define your config (serialized to the database)
54//! #[derive(Debug, Serialize, Deserialize)]
55//! struct MyConfig {
56//!     value: i32,
57//! }
58//!
59//! // 2. Define your initializer
60//! struct MyInitializer;
61//!
62//! impl JobInitializer for MyInitializer {
63//!     type Config = MyConfig;
64//!
65//!     fn job_type(&self) -> JobType {
66//!         JobType::new("my-job")
67//!     }
68//!
69//!     fn init(&self, job: &Job) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
70//!         let config: MyConfig = job.config()?;
71//!         Ok(Box::new(MyRunner { value: config.value }))
72//!     }
73//! }
74//!
75//! // 3. Define your runner
76//! struct MyRunner {
77//!     value: i32,
78//! }
79//!
80//! #[async_trait]
81//! impl JobRunner for MyRunner {
82//!     async fn run(
83//!         &self,
84//!         _current_job: CurrentJob,
85//!     ) -> Result<JobCompletion, Box<dyn std::error::Error>> {
86//!         println!("Processing value: {}", self.value);
87//!         Ok(JobCompletion::Complete)
88//!     }
89//! }
90//!
91//! // 4. Wire it up
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//!     let config = JobSvcConfig::builder()
95//!         .pg_con("postgres://user:pass@localhost/db")
96//!         .build()?;
97//!
98//!     let mut jobs = Jobs::init(config).await?;
99//!
100//!     // Registration returns a type-safe spawner
101//!     let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);
102//!
103//!     jobs.start_poll().await?;
104//!
105//!     // Use the spawner to create jobs
106//!     spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
107//!
108//!     Ok(())
109//! }
110//! ```
111//!
112//! ## Scheduling
113//!
114//! Jobs run immediately once a poller claims them. If you need a future start
115//! time, schedule it up front with [`JobSpawner::spawn_at_in_op`]. After a
116//! run completes, return [`JobCompletion::Complete`] for one-off work or use the
117//! `JobCompletion::Reschedule*` variants to book the next execution.
118//!
119//! ## Retries
120//!
121//! Retry behaviour comes from [`JobInitializer::retry_on_error_settings`]. Once
122//! attempts are exhausted the job is marked as errored and removed from the
123//! queue.
124//!
125//! ```ignore
126//! impl JobInitializer for MyInitializer {
127//!     // ...
128//!
129//!     fn retry_on_error_settings(&self) -> RetrySettings {
130//!         RetrySettings {
131//!             n_attempts: Some(5),
132//!             min_backoff: Duration::from_secs(10),
133//!             max_backoff: Duration::from_secs(300),
134//!             ..Default::default()
135//!         }
136//!     }
137//! }
138//! ```
139//!
140//! ## Uniqueness
141//!
142//! For at-most-one semantics, use [`JobSpawner::spawn_unique`]. This method
143//! consumes the spawner, enforcing at the type level that only one job of
144//! this type can exist:
145//!
146//! ```ignore
147//! let cleanup_spawner = jobs.add_initializer(CleanupInitializer);
148//!
149//! // Consumes spawner - can't accidentally spawn twice
150//! cleanup_spawner.spawn_unique(JobId::new(), CleanupConfig::default()).await?;
151//! ```
152//!
153//! ## Parameterized Job Types
154//!
155//! For cases where the job type is configured at runtime (e.g., multi-tenant inboxes),
156//! store the job type in your initializer and return it from the instance method:
157//!
158//! ```ignore
159//! struct TenantJobInitializer {
160//!     job_type: JobType,
161//!     tenant_id: String,
162//! }
163//!
164//! impl JobInitializer for TenantJobInitializer {
165//!     type Config = TenantJobConfig;
166//!
167//!     fn job_type(&self) -> JobType {
168//!         self.job_type.clone()  // From instance, not hardcoded
169//!     }
170//!
171//!     // ...
172//! }
173//! ```
174//!
175//! ## Database migrations
176//!
177//! See the [setup guide](https://github.com/GaloyMoney/job/blob/main/README.md#setup)
178//! for migration options and examples.
179//!
180//! ## Feature flags
181//!
182//! - `es-entity` enables advanced integration with the [`es_entity`] crate,
183//!   allowing runners to finish with `DbOp` handles and enriching tracing/event
184//!   metadata.
185//!
186//! ## Testing with simulated time
187//!
188//! For deterministic testing of time-dependent behavior (e.g., backoff strategies),
189//! inject an artificial clock via [`JobSvcConfig::clock`]:
190//!
191//! ```ignore
192//! use job::{JobSvcConfig, ClockHandle, ArtificialClockConfig};
193//!
194//! let (clock, controller) = ClockHandle::artificial(ArtificialClockConfig::manual());
195//! let config = JobSvcConfig::builder()
196//!     .pool(pool)
197//!     .clock(clock)
198//!     .build()?;
199//!
200//! // Advance time deterministically
201//! controller.advance(Duration::from_secs(60)).await;
202//! ```
203
204#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
205#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
206#![forbid(unsafe_code)]
207
208mod config;
209mod current;
210mod dispatcher;
211mod entity;
212mod handle;
213mod migrate;
214mod poller;
215mod registry;
216mod repo;
217mod runner;
218mod spawner;
219mod tracker;
220
221pub mod error;
222
223use tracing::instrument;
224
225use std::sync::{Arc, Mutex};
226
227pub use config::*;
228pub use current::*;
229pub use entity::{Job, JobType};
230pub use es_entity::clock::{
231    ArtificialClockConfig, ArtificialMode, Clock, ClockController, ClockHandle,
232};
233pub use migrate::*;
234pub use registry::*;
235pub use runner::*;
236pub use spawner::*;
237
238use error::*;
239use poller::*;
240use repo::*;
241
242es_entity::entity_id! { JobId }
243
244#[derive(Clone)]
245/// Primary entry point for interacting with the Job crate. Provides APIs to register job
246/// handlers, manage configuration, and control scheduling and execution.
247pub struct Jobs {
248    config: JobSvcConfig,
249    repo: Arc<JobRepo>,
250    registry: Arc<Mutex<Option<JobRegistry>>>,
251    poller_handle: Option<Arc<JobPollerHandle>>,
252    clock: ClockHandle,
253}
254
255impl Jobs {
256    /// Initialize the service using a [`JobSvcConfig`] for connection and runtime settings.
257    pub async fn init(config: JobSvcConfig) -> Result<Self, JobError> {
258        let pool = match (config.pool.clone(), config.pg_con.clone()) {
259            (Some(pool), None) => pool,
260            (None, Some(pg_con)) => {
261                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
262                if let Some(max_connections) = config.max_connections {
263                    pool_opts = pool_opts.max_connections(max_connections);
264                }
265                pool_opts.connect(&pg_con).await.map_err(JobError::Sqlx)?
266            }
267            _ => {
268                return Err(JobError::Config(
269                    "One of pg_con or pool must be set".to_string(),
270                ));
271            }
272        };
273
274        if config.exec_migrations {
275            sqlx::migrate!().run(&pool).await?;
276        }
277
278        let repo = Arc::new(JobRepo::new(&pool));
279        let registry = Arc::new(Mutex::new(Some(JobRegistry::new())));
280        let clock = config.clock.clone();
281        Ok(Self {
282            repo,
283            config,
284            registry,
285            poller_handle: None,
286            clock,
287        })
288    }
289
290    /// Start the background poller that fetches and dispatches jobs from Postgres.
291    ///
292    /// Call this only after registering every job initializer. The call consumes the internal
293    /// registry; attempting to register additional initializers or starting the poller again
294    /// afterwards will panic with `Registry has been consumed by executor`.
295    ///
296    /// # Errors
297    ///
298    /// Returns [`JobError::Sqlx`] if the poller cannot initialise its database listeners or
299    /// supporting tasks.
300    ///
301    /// # Panics
302    ///
303    /// Panics if invoked more than once, or if [`Jobs::add_initializer`] is called after the
304    /// poller has started.
305    ///
306    /// # Examples
307    ///
308    /// Register any initializers and then start the poller:
309    ///
310    /// ```no_run
311    /// use job::{
312    ///     Jobs, JobSvcConfig, Job, JobId, JobInitializer, JobRunner, JobType, JobCompletion,
313    ///     CurrentJob, JobSpawner
314    /// };
315    /// use job::error::JobError;
316    /// use async_trait::async_trait;
317    /// use serde::{Serialize, Deserialize};
318    /// use sqlx::postgres::PgPoolOptions;
319    /// use std::error::Error;
320    ///
321    /// #[derive(Debug, Serialize, Deserialize)]
322    /// struct MyConfig {
323    ///     value: i32,
324    /// }
325    ///
326    /// struct MyInitializer;
327    ///
328    /// impl JobInitializer for MyInitializer {
329    ///     type Config = MyConfig;
330    ///
331    ///     fn job_type(&self) -> JobType {
332    ///         JobType::new("example")
333    ///     }
334    ///
335    ///     fn init(&self, _job: &Job, _: JobSpawner<Self::Config>) -> Result<Box<dyn JobRunner>, Box<dyn Error>> {
336    ///         Ok(Box::new(MyRunner))
337    ///     }
338    /// }
339    ///
340    /// struct MyRunner;
341    ///
342    /// #[async_trait]
343    /// impl JobRunner for MyRunner {
344    ///     async fn run(
345    ///         &self,
346    ///         _current_job: CurrentJob,
347    ///     ) -> Result<JobCompletion, Box<dyn Error>> {
348    ///         Ok(JobCompletion::Complete)
349    ///     }
350    /// }
351    ///
352    /// # async fn example() -> Result<(), JobError> {
353    /// let pool = PgPoolOptions::new()
354    ///     .connect_lazy("postgres://postgres:password@localhost/postgres")?;
355    /// let config = JobSvcConfig::builder().pool(pool).build().unwrap();
356    /// let mut jobs = Jobs::init(config).await?;
357    ///
358    /// // Registration returns a type-safe spawner
359    /// let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);
360    ///
361    /// jobs.start_poll().await?;
362    ///
363    /// // Use the spawner to create jobs
364    /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
365    /// # Ok(())
366    /// # }
367    /// # tokio::runtime::Runtime::new().unwrap().block_on(example()).unwrap();
368    /// ```
369    ///
370    /// Calling `start_poll` again, or attempting to register a new initializer afterwards,
371    /// results in a panic:
372    ///
373    /// ```no_run
374    /// use job::{
375    ///     Jobs, JobSvcConfig, Job, JobInitializer, JobRunner, JobType, JobCompletion, CurrentJob,
376    ///     JobSpawner,
377    /// };
378    /// use job::error::JobError;
379    /// use async_trait::async_trait;
380    /// use serde::{Serialize, Deserialize};
381    /// use sqlx::postgres::PgPoolOptions;
382    /// use std::error::Error;
383    ///
384    /// #[derive(Debug, Serialize, Deserialize)]
385    /// struct MyConfig {
386    ///     value: i32,
387    /// }
388    ///
389    /// struct MyInitializer;
390    ///
391    /// impl JobInitializer for MyInitializer {
392    ///     type Config = MyConfig;
393    ///
394    ///     fn job_type(&self) -> JobType {
395    ///         JobType::new("example")
396    ///     }
397    ///
398    ///     fn init(&self, _job: &Job, _spawner: JobSpawner<Self::Config>) -> Result<Box<dyn JobRunner>, Box<dyn Error>> {
399    ///         Ok(Box::new(MyRunner))
400    ///     }
401    /// }
402    ///
403    /// struct MyRunner;
404    ///
405    /// #[async_trait]
406    /// impl JobRunner for MyRunner {
407    ///     async fn run(
408    ///         &self,
409    ///         _current_job: CurrentJob,
410    ///     ) -> Result<JobCompletion, Box<dyn Error>> {
411    ///         Ok(JobCompletion::Complete)
412    ///     }
413    /// }
414    ///
415    /// # async fn double_start() -> Result<(), JobError> {
416    /// let pool = PgPoolOptions::new()
417    ///     .connect_lazy("postgres://postgres:password@localhost/postgres")?;
418    /// let config = JobSvcConfig::builder().pool(pool).build().unwrap();
419    /// let mut jobs = Jobs::init(config).await?;
420    ///
421    /// let _spawner = jobs.add_initializer(MyInitializer);
422    ///
423    /// jobs.start_poll().await?;
424    ///
425    /// // Panics with "Registry has been consumed by executor".
426    /// // jobs.start_poll().await.unwrap();
427    ///
428    /// // Also panics because the registry moved into the poller.
429    /// // jobs.add_initializer(MyInitializer);
430    /// # Ok(())
431    /// # }
432    /// # tokio::runtime::Runtime::new().unwrap().block_on(double_start()).unwrap();
433    /// ```
434    pub async fn start_poll(&mut self) -> Result<(), JobError> {
435        let registry = self
436            .registry
437            .lock()
438            .expect("Couldn't lock Registry Mutex")
439            .take()
440            .expect("Registry has been consumed by executor");
441        self.poller_handle = Some(Arc::new(
442            JobPoller::new(
443                self.config.poller_config.clone(),
444                Arc::clone(&self.repo),
445                registry,
446                self.clock.clone(),
447            )
448            .start()
449            .await?,
450        ));
451        Ok(())
452    }
453
454    /// Register a [`JobInitializer`] and return a [`JobSpawner`] for creating jobs.
455    ///
456    /// # Examples
457    ///
458    /// ```ignore
459    /// let spawner = jobs.add_initializer(MyInitializer);
460    /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
461    /// ```
462    ///
463    /// # Panics
464    ///
465    /// Panics if called after [`start_poll`](Self::start_poll).
466    pub fn add_initializer<I: JobInitializer>(&mut self, initializer: I) -> JobSpawner<I::Config> {
467        let job_type = {
468            let mut registry = self.registry.lock().expect("Couldn't lock Registry Mutex");
469            registry
470                .as_mut()
471                .expect("Registry has been consumed by executor")
472                .add_initializer(initializer)
473        };
474        JobSpawner::new(Arc::clone(&self.repo), job_type, self.clock.clone())
475    }
476
477    /// Fetch the current snapshot of a job entity by identifier.
478    #[instrument(name = "job.find", skip(self))]
479    pub async fn find(&self, id: JobId) -> Result<Job, JobError> {
480        self.repo.find_by_id(id).await
481    }
482
483    /// Returns a reference to the clock used by this job service.
484    pub fn clock(&self) -> &ClockHandle {
485        &self.clock
486    }
487
488    /// Gracefully shut down the job poller.
489    ///
490    /// This method is idempotent and can be called multiple times safely.
491    /// It will send a shutdown signal to all running jobs, wait briefly for them
492    /// to complete, and reschedule any jobs still running.
493    ///
494    /// If not called manually, shutdown will be automatically triggered when the
495    /// Jobs instance is dropped.
496    #[instrument(name = "job.shutdown", skip(self), err)]
497    pub async fn shutdown(&self) -> Result<(), JobError> {
498        if let Some(handle) = &self.poller_handle {
499            handle.shutdown().await?;
500        }
501        Ok(())
502    }
503}