job/lib.rs
1//! `job` is an async, Postgres-backed job scheduler and runner for Rust
2//! applications. It coordinates distributed workers, tracks job history, and
3//! handles retries with predictable backoff. Inspired by earlier systems like
4//! [Sidekiq](https://sidekiq.org), it focuses on running your
5//! application code asynchronously, outside of request/response paths while keeping business
6//! logic in familiar Rust async functions. The crate uses [`sqlx`] for
7//! database access and forbids `unsafe`.
8//!
9//! ## Documentation
10//! - [Github repository](https://github.com/GaloyMoney/job)
11//! - [Cargo package](https://crates.io/crates/job)
12//!
13//! ## Highlights
14//! - Durable Postgres-backed storage so jobs survive restarts and crashes.
15//! - Automatic exponential backoff with jitter, plus opt-in infinite retries.
16//! - Concurrency controls that let many worker instances share the workload,
17//! configurable through [`JobPollerConfig`].
18//! - Optional at-most-one-per-type queueing via [`JobSpawner::spawn_unique`].
19//! - Built-in migrations that you can run automatically or embed into your own
20//! migration workflow.
21//!
22//! ## Core Concepts
23//! - **Jobs service** – [`Jobs`] owns registration, polling, and shutdown.
24//! - **Initializer** – [`JobInitializer`] registers a job type and builds a
25//! [`JobRunner`] for each execution. Defines the associated `Config` type.
26//! - **Spawner** – [`JobSpawner`] is returned from registration and provides
27//! type-safe job creation methods. Parameterized by the config type.
28//! - **Runner** – [`JobRunner`] performs the work using the provided
29//! [`CurrentJob`] context.
30//! - **Current job** – [`CurrentJob`] exposes attempt counts, execution state,
31//! and access to the Postgres pool during a run.
32//! - **Completion** – [`JobCompletion`] returns the outcome: finish, retry, or
33//! reschedule at a later time.
34//!
35//! ## Lifecycle
36//!
37//! 1. Initialize the service with [`Jobs::init`]
38//! 2. Register initializers with [`Jobs::add_initializer`] – returns a [`JobSpawner`]
39//! 3. Start polling with [`Jobs::start_poll`]
40//! 4. Use spawners to create jobs throughout your application
41//! 5. Shut down gracefully with [`Jobs::shutdown`]
42//!
43//! ## Example
44//!
45//! ```ignore
46//! use async_trait::async_trait;
47//! use job::{
48//! CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner,
49//! JobSpawner, JobSvcConfig, JobType, Jobs,
50//! };
51//! use serde::{Deserialize, Serialize};
52//!
53//! // 1. Define your config (serialized to the database)
54//! #[derive(Debug, Serialize, Deserialize)]
55//! struct MyConfig {
56//! value: i32,
57//! }
58//!
59//! // 2. Define your initializer
60//! struct MyInitializer;
61//!
62//! impl JobInitializer for MyInitializer {
63//! type Config = MyConfig;
64//!
65//! fn job_type(&self) -> JobType {
66//! JobType::new("my-job")
67//! }
68//!
69//! fn init(&self, job: &Job) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
70//! let config: MyConfig = job.config()?;
71//! Ok(Box::new(MyRunner { value: config.value }))
72//! }
73//! }
74//!
75//! // 3. Define your runner
76//! struct MyRunner {
77//! value: i32,
78//! }
79//!
80//! #[async_trait]
81//! impl JobRunner for MyRunner {
82//! async fn run(
83//! &self,
84//! _current_job: CurrentJob,
85//! ) -> Result<JobCompletion, Box<dyn std::error::Error>> {
86//! println!("Processing value: {}", self.value);
87//! Ok(JobCompletion::Complete)
88//! }
89//! }
90//!
91//! // 4. Wire it up
92//! #[tokio::main]
93//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
94//! let config = JobSvcConfig::builder()
95//! .pg_con("postgres://user:pass@localhost/db")
96//! .build()?;
97//!
98//! let mut jobs = Jobs::init(config).await?;
99//!
100//! // Registration returns a type-safe spawner
101//! let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);
102//!
103//! jobs.start_poll().await?;
104//!
105//! // Use the spawner to create jobs
106//! spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
107//!
108//! Ok(())
109//! }
110//! ```
111//!
112//! ## Scheduling
113//!
114//! Jobs run immediately once a poller claims them. If you need a future start
115//! time, schedule it up front with [`JobSpawner::spawn_at_in_op`]. After a
116//! run completes, return [`JobCompletion::Complete`] for one-off work or use the
117//! `JobCompletion::Reschedule*` variants to book the next execution.
118//!
119//! ## Retries
120//!
121//! Retry behaviour comes from [`JobInitializer::retry_on_error_settings`]. Once
122//! attempts are exhausted the job is marked as errored and removed from the
123//! queue.
124//!
125//! ```ignore
126//! impl JobInitializer for MyInitializer {
127//! // ...
128//!
129//! fn retry_on_error_settings(&self) -> RetrySettings {
130//! RetrySettings {
131//! n_attempts: Some(5),
132//! min_backoff: Duration::from_secs(10),
133//! max_backoff: Duration::from_secs(300),
134//! ..Default::default()
135//! }
136//! }
137//! }
138//! ```
139//!
140//! ## Uniqueness
141//!
142//! For at-most-one semantics, use [`JobSpawner::spawn_unique`]. This method
143//! consumes the spawner, enforcing at the type level that only one job of
144//! this type can exist:
145//!
146//! ```ignore
147//! let cleanup_spawner = jobs.add_initializer(CleanupInitializer);
148//!
149//! // Consumes spawner - can't accidentally spawn twice
150//! cleanup_spawner.spawn_unique(JobId::new(), CleanupConfig::default()).await?;
151//! ```
152//!
153//! ## Parameterized Job Types
154//!
155//! For cases where the job type is configured at runtime (e.g., multi-tenant inboxes),
156//! store the job type in your initializer and return it from the instance method:
157//!
158//! ```ignore
159//! struct TenantJobInitializer {
160//! job_type: JobType,
161//! tenant_id: String,
162//! }
163//!
164//! impl JobInitializer for TenantJobInitializer {
165//! type Config = TenantJobConfig;
166//!
167//! fn job_type(&self) -> JobType {
168//! self.job_type.clone() // From instance, not hardcoded
169//! }
170//!
171//! // ...
172//! }
173//! ```
174//!
175//! ## Database migrations
176//!
177//! See the [setup guide](https://github.com/GaloyMoney/job/blob/main/README.md#setup)
178//! for migration options and examples.
179//!
180//! ## Feature flags
181//!
182//! - `es-entity` enables advanced integration with the [`es_entity`] crate,
183//! allowing runners to finish with `DbOp` handles and enriching tracing/event
184//! metadata.
185//!
186//! ## Testing with simulated time
187//!
188//! For deterministic testing of time-dependent behavior (e.g., backoff strategies),
189//! inject an artificial clock via [`JobSvcConfig::clock`]:
190//!
191//! ```ignore
192//! use job::{JobSvcConfig, ClockHandle, ArtificialClockConfig};
193//!
194//! let (clock, controller) = ClockHandle::artificial(ArtificialClockConfig::manual());
195//! let config = JobSvcConfig::builder()
196//! .pool(pool)
197//! .clock(clock)
198//! .build()?;
199//!
200//! // Advance time deterministically
201//! controller.advance(Duration::from_secs(60)).await;
202//! ```
203
204#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
205#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
206#![forbid(unsafe_code)]
207
208mod config;
209mod current;
210mod dispatcher;
211mod entity;
212mod handle;
213mod migrate;
214mod poller;
215mod registry;
216mod repo;
217mod runner;
218mod spawner;
219mod tracker;
220
221pub mod error;
222
223use tracing::instrument;
224
225use std::sync::{Arc, Mutex};
226
227pub use config::*;
228pub use current::*;
229pub use entity::{Job, JobType};
230pub use es_entity::clock::{
231 ArtificialClockConfig, ArtificialMode, Clock, ClockController, ClockHandle,
232};
233pub use migrate::*;
234pub use registry::*;
235pub use runner::*;
236pub use spawner::*;
237
238use error::*;
239use poller::*;
240use repo::*;
241
242es_entity::entity_id! { JobId }
243
244#[derive(Clone)]
245/// Primary entry point for interacting with the Job crate. Provides APIs to register job
246/// handlers, manage configuration, and control scheduling and execution.
247pub struct Jobs {
248 config: JobSvcConfig,
249 repo: Arc<JobRepo>,
250 registry: Arc<Mutex<Option<JobRegistry>>>,
251 poller_handle: Option<Arc<JobPollerHandle>>,
252 clock: ClockHandle,
253}
254
255impl Jobs {
256 /// Initialize the service using a [`JobSvcConfig`] for connection and runtime settings.
257 pub async fn init(config: JobSvcConfig) -> Result<Self, JobError> {
258 let pool = match (config.pool.clone(), config.pg_con.clone()) {
259 (Some(pool), None) => pool,
260 (None, Some(pg_con)) => {
261 let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
262 if let Some(max_connections) = config.max_connections {
263 pool_opts = pool_opts.max_connections(max_connections);
264 }
265 pool_opts.connect(&pg_con).await.map_err(JobError::Sqlx)?
266 }
267 _ => {
268 return Err(JobError::Config(
269 "One of pg_con or pool must be set".to_string(),
270 ));
271 }
272 };
273
274 if config.exec_migrations {
275 sqlx::migrate!().run(&pool).await?;
276 }
277
278 let repo = Arc::new(JobRepo::new(&pool));
279 let registry = Arc::new(Mutex::new(Some(JobRegistry::new())));
280 let clock = config.clock.clone();
281 Ok(Self {
282 repo,
283 config,
284 registry,
285 poller_handle: None,
286 clock,
287 })
288 }
289
290 /// Start the background poller that fetches and dispatches jobs from Postgres.
291 ///
292 /// Call this only after registering every job initializer. The call consumes the internal
293 /// registry; attempting to register additional initializers or starting the poller again
294 /// afterwards will panic with `Registry has been consumed by executor`.
295 ///
296 /// # Errors
297 ///
298 /// Returns [`JobError::Sqlx`] if the poller cannot initialise its database listeners or
299 /// supporting tasks.
300 ///
301 /// # Panics
302 ///
303 /// Panics if invoked more than once, or if [`Jobs::add_initializer`] is called after the
304 /// poller has started.
305 ///
306 /// # Examples
307 ///
308 /// Register any initializers and then start the poller:
309 ///
310 /// ```no_run
311 /// use job::{
312 /// Jobs, JobSvcConfig, Job, JobId, JobInitializer, JobRunner, JobType, JobCompletion,
313 /// CurrentJob, JobSpawner
314 /// };
315 /// use job::error::JobError;
316 /// use async_trait::async_trait;
317 /// use serde::{Serialize, Deserialize};
318 /// use sqlx::postgres::PgPoolOptions;
319 /// use std::error::Error;
320 ///
321 /// #[derive(Debug, Serialize, Deserialize)]
322 /// struct MyConfig {
323 /// value: i32,
324 /// }
325 ///
326 /// struct MyInitializer;
327 ///
328 /// impl JobInitializer for MyInitializer {
329 /// type Config = MyConfig;
330 ///
331 /// fn job_type(&self) -> JobType {
332 /// JobType::new("example")
333 /// }
334 ///
335 /// fn init(&self, _job: &Job, _: JobSpawner<Self::Config>) -> Result<Box<dyn JobRunner>, Box<dyn Error>> {
336 /// Ok(Box::new(MyRunner))
337 /// }
338 /// }
339 ///
340 /// struct MyRunner;
341 ///
342 /// #[async_trait]
343 /// impl JobRunner for MyRunner {
344 /// async fn run(
345 /// &self,
346 /// _current_job: CurrentJob,
347 /// ) -> Result<JobCompletion, Box<dyn Error>> {
348 /// Ok(JobCompletion::Complete)
349 /// }
350 /// }
351 ///
352 /// # async fn example() -> Result<(), JobError> {
353 /// let pool = PgPoolOptions::new()
354 /// .connect_lazy("postgres://postgres:password@localhost/postgres")?;
355 /// let config = JobSvcConfig::builder().pool(pool).build().unwrap();
356 /// let mut jobs = Jobs::init(config).await?;
357 ///
358 /// // Registration returns a type-safe spawner
359 /// let spawner: JobSpawner<MyConfig> = jobs.add_initializer(MyInitializer);
360 ///
361 /// jobs.start_poll().await?;
362 ///
363 /// // Use the spawner to create jobs
364 /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
365 /// # Ok(())
366 /// # }
367 /// # tokio::runtime::Runtime::new().unwrap().block_on(example()).unwrap();
368 /// ```
369 ///
370 /// Calling `start_poll` again, or attempting to register a new initializer afterwards,
371 /// results in a panic:
372 ///
373 /// ```no_run
374 /// use job::{
375 /// Jobs, JobSvcConfig, Job, JobInitializer, JobRunner, JobType, JobCompletion, CurrentJob,
376 /// JobSpawner,
377 /// };
378 /// use job::error::JobError;
379 /// use async_trait::async_trait;
380 /// use serde::{Serialize, Deserialize};
381 /// use sqlx::postgres::PgPoolOptions;
382 /// use std::error::Error;
383 ///
384 /// #[derive(Debug, Serialize, Deserialize)]
385 /// struct MyConfig {
386 /// value: i32,
387 /// }
388 ///
389 /// struct MyInitializer;
390 ///
391 /// impl JobInitializer for MyInitializer {
392 /// type Config = MyConfig;
393 ///
394 /// fn job_type(&self) -> JobType {
395 /// JobType::new("example")
396 /// }
397 ///
398 /// fn init(&self, _job: &Job, _spawner: JobSpawner<Self::Config>) -> Result<Box<dyn JobRunner>, Box<dyn Error>> {
399 /// Ok(Box::new(MyRunner))
400 /// }
401 /// }
402 ///
403 /// struct MyRunner;
404 ///
405 /// #[async_trait]
406 /// impl JobRunner for MyRunner {
407 /// async fn run(
408 /// &self,
409 /// _current_job: CurrentJob,
410 /// ) -> Result<JobCompletion, Box<dyn Error>> {
411 /// Ok(JobCompletion::Complete)
412 /// }
413 /// }
414 ///
415 /// # async fn double_start() -> Result<(), JobError> {
416 /// let pool = PgPoolOptions::new()
417 /// .connect_lazy("postgres://postgres:password@localhost/postgres")?;
418 /// let config = JobSvcConfig::builder().pool(pool).build().unwrap();
419 /// let mut jobs = Jobs::init(config).await?;
420 ///
421 /// let _spawner = jobs.add_initializer(MyInitializer);
422 ///
423 /// jobs.start_poll().await?;
424 ///
425 /// // Panics with "Registry has been consumed by executor".
426 /// // jobs.start_poll().await.unwrap();
427 ///
428 /// // Also panics because the registry moved into the poller.
429 /// // jobs.add_initializer(MyInitializer);
430 /// # Ok(())
431 /// # }
432 /// # tokio::runtime::Runtime::new().unwrap().block_on(double_start()).unwrap();
433 /// ```
434 pub async fn start_poll(&mut self) -> Result<(), JobError> {
435 let registry = self
436 .registry
437 .lock()
438 .expect("Couldn't lock Registry Mutex")
439 .take()
440 .expect("Registry has been consumed by executor");
441 self.poller_handle = Some(Arc::new(
442 JobPoller::new(
443 self.config.poller_config.clone(),
444 Arc::clone(&self.repo),
445 registry,
446 self.clock.clone(),
447 )
448 .start()
449 .await?,
450 ));
451 Ok(())
452 }
453
454 /// Register a [`JobInitializer`] and return a [`JobSpawner`] for creating jobs.
455 ///
456 /// # Examples
457 ///
458 /// ```ignore
459 /// let spawner = jobs.add_initializer(MyInitializer);
460 /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?;
461 /// ```
462 ///
463 /// # Panics
464 ///
465 /// Panics if called after [`start_poll`](Self::start_poll).
466 pub fn add_initializer<I: JobInitializer>(&mut self, initializer: I) -> JobSpawner<I::Config> {
467 let job_type = {
468 let mut registry = self.registry.lock().expect("Couldn't lock Registry Mutex");
469 registry
470 .as_mut()
471 .expect("Registry has been consumed by executor")
472 .add_initializer(initializer)
473 };
474 JobSpawner::new(Arc::clone(&self.repo), job_type, self.clock.clone())
475 }
476
477 /// Fetch the current snapshot of a job entity by identifier.
478 #[instrument(name = "job.find", skip(self))]
479 pub async fn find(&self, id: JobId) -> Result<Job, JobError> {
480 self.repo.find_by_id(id).await
481 }
482
483 /// Returns a reference to the clock used by this job service.
484 pub fn clock(&self) -> &ClockHandle {
485 &self.clock
486 }
487
488 /// Gracefully shut down the job poller.
489 ///
490 /// This method is idempotent and can be called multiple times safely.
491 /// It will send a shutdown signal to all running jobs, wait briefly for them
492 /// to complete, and reschedule any jobs still running.
493 ///
494 /// If not called manually, shutdown will be automatically triggered when the
495 /// Jobs instance is dropped.
496 #[instrument(name = "job.shutdown", skip(self), err)]
497 pub async fn shutdown(&self) -> Result<(), JobError> {
498 if let Some(handle) = &self.poller_handle {
499 handle.shutdown().await?;
500 }
501 Ok(())
502 }
503}