1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
//! # Underway
//!
//! ⏳ Durable step functions via Postgres.
//!
//! # Overview
//!
//! **Underway** provides durable background jobs over Postgres. Jobs are
//! composed of a sequence of one or more steps. Each step takes the output of
//! the previous step as its input. These simple workflows provide a powerful
//! interface to common deferred work use cases.
//!
//! Key Features:
//!
//! - **PostgreSQL-Backed** Leverages PostgreSQL with `FOR UPDATE SKIP LOCKED`
//! for reliable task storage and coordination.
//! - **Atomic Task Management**: Enqueue tasks within your transactions and use
//! the worker's transaction within your tasks for atomic queries.
//! - **Automatic Retries**: Configurable retry strategies ensure tasks are
//! reliably completed, even after transient failures.
//! - **Cron-Like Scheduling**: Schedule recurring tasks with cron-like
//! expressions for automated, time-based job execution.
//! - **Scalable and Flexible**: Easily scales from a single worker to many,
//! enabling seamless background job processing with minimal setup.
//!
//! # Examples
//!
//! Underway is suitable for many different use cases, ranging from simple
//! single-step jobs to more sophisticated multi-step jobs, where dependencies
//! are built up between steps.
//!
//! ## Welcome emails
//!
//! A common use case is deferring work that can be processed later. For
//! instance, during user registration, we might want to send a welcome email to
//! new users. Rather than handling this within the registration process (e.g.,
//! form validation, database insertion), we can offload it to run "out-of-band"
//! using Underway. By defining a job for sending the welcome email, Underway
//! ensures it gets processed in the background, without slowing down the user
//! registration flow.
//!
//! ```rust,no_run
//! use std::env;
//!
//! use serde::{Deserialize, Serialize};
//! use sqlx::PgPool;
//! use underway::{Job, To};
//!
//! // This is the input we'll provide to the job when we enqueue it.
//! #[derive(Deserialize, Serialize)]
//! struct WelcomeEmail {
//! user_id: i32,
//! email: String,
//! name: String,
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Set up the database connection pool.
//! let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
//! .step(
//! |_cx,
//! WelcomeEmail {
//! user_id,
//! email,
//! name,
//! }| async move {
//! // Simulate sending an email.
//! println!("Sending welcome email to {name} <{email}> (user_id: {user_id})");
//! // Returning this indicates this is the final step.
//! To::done()
//! },
//! )
//! .name("welcome-email")
//! .pool(pool)
//! .build()
//! .await?;
//!
//! // Here we enqueue a new job to be processed later.
//! job.enqueue(&WelcomeEmail {
//! user_id: 42,
//! email: "ferris@example.com".to_string(),
//! name: "Ferris".to_string(),
//! })
//! .await?;
//!
//! // Start processing enqueued jobs.
//! job.start().await??;
//!
//! Ok(())
//! }
//! ```
//!
//! ## Order receipts
//!
//! Another common use case is defining dependencies between discrete steps of a
//! job. For instance, we might generate PDF receipts for orders and then email
//! these to customers. With Underway, each step is handled separately, making
//! it easy to create a job that first generates the PDF and, once
//! completed, proceeds to send the email.
//!
//! This separation provides significant value: if the email sending service
//! is temporarily unavailable, we can retry the email step without having to
//! regenerate the PDF, avoiding unnecessary repeated work.
//!
//! ```rust,no_run
//! use std::env;
//!
//! use serde::{Deserialize, Serialize};
//! use sqlx::PgPool;
//! use underway::{Job, To};
//!
//! #[derive(Deserialize, Serialize)]
//! struct GenerateReceipt {
//! // An order we want to generate a receipt for.
//! order_id: i32,
//! }
//!
//! #[derive(Deserialize, Serialize)]
//! struct EmailReceipt {
//! // An object store key to our receipt PDF.
//! receipt_key: String,
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Set up the database connection pool.
//! let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
//! .step(|_cx, GenerateReceipt { order_id }| async move {
//! // Use the order ID to build a receipt PDF...
//! let receipt_key = format!("receipts_bucket/{order_id}-receipt.pdf");
//! // ...store the PDF in an object store.
//!
//! // We proceed to the next step with the receipt_key as its input.
//! To::next(EmailReceipt { receipt_key })
//! })
//! .step(|_cx, EmailReceipt { receipt_key }| async move {
//! // Retrieve the PDF from the object store, and send the email.
//! println!("Emailing receipt for {receipt_key}");
//! To::done()
//! })
//! .name("order-receipt")
//! .pool(pool)
//! .build()
//! .await?;
//!
//! // Enqueue the job for the given order.
//! job.enqueue(&GenerateReceipt { order_id: 42 }).await?;
//!
//! // Start processing enqueued jobs.
//! job.start().await??;
//!
//! Ok(())
//! }
//! ```
//!
//! With this setup, if the email service is down, the `EmailReceipt` step can
//! be retried without redoing the PDF generation, saving time and resources by
//! not repeating the expensive step of generating the PDF.
//!
//! ## Daily reports
//!
//! Jobs may also be run on a schedule. This makes them useful for situations
//! where we want to do things on a regular cadence, such as creating a daily
//! business report.
//!
//! ```rust,no_run
//! use std::env;
//!
//! use serde::{Deserialize, Serialize};
//! use sqlx::PgPool;
//! use underway::{Job, To};
//!
//! #[derive(Deserialize, Serialize)]
//! struct DailyReport;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Set up the database connection pool.
//! let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
//! let pool = PgPool::connect(database_url).await?;
//!
//! // Run migrations.
//! underway::run_migrations(&pool).await?;
//!
//! // Build the job.
//! let job = Job::builder()
//! .step(|_cx, _| async move {
//! // Here we would generate and store the report.
//! To::done()
//! })
//! .name("daily-report")
//! .pool(pool)
//! .build()
//! .await?;
//!
//! // Set a daily schedule with the given input.
//! let daily = "@daily[America/Los_Angeles]".parse()?;
//! job.schedule(&daily, &DailyReport).await?;
//!
//! // Start processing enqueued jobs.
//! job.start().await??;
//!
//! Ok(())
//! }
//! ```
//!
//! # Concepts
//!
//! Underway has been designed around several core concepts, which build on one
//! another to deliver a robust background-job framework:
//!
//! - [Tasks](#tasks) represent a well-structured unit of work.
//! - [Jobs](#jobs) are a series of sequential steps, where each step is a
//! [`Task`].
//! - [Queues](#queues) provide an interface for managing task lifecycle.
//! - [Workers](#workers) interface with queues to execute tasks.
//!
//! ## Tasks
//!
//! Tasks are units of work to be executed, with clearly defined behavior and
//! input.
//!
//! This is the lowest-level concept in our design, with everything else being
//! built on top of or around this idea.
//!
//! See [`task`] for more details about tasks.
//!
//! ## Jobs
//!
//! Jobs are a series of sequential steps. Each step provides input to the next
//! step in the series.
//!
//! In most cases, applications will use jobs to define tasks instead of using
//! the `Task` trait directly.
//!
//! See [`job`] for more details about jobs.
//!
//! ## Queues
//!
//! Queues manage task lifecycle, including enqueuing and dequeuing them from
//! the database.
//!
//! See [`queue`] for more details about queues.
//!
//! ## Workers
//!
//! Workers are responsible for executing tasks. They poll the queue for new
//! tasks, and when found, try to invoke the task's execute routine.
//!
//! See [`worker`] for more details about workers.
//!
//! ## Strata
//!
//! The Underway system is split into a **lower-level** and a **higher-level**
//! system, where the latter is the **job** abstraction and the former is
//! everything else. More specifically the lower-level components are the
//! **queue**, **worker**, **scheduler**, and **task**. The locus of the
//! composite system is the task, with all components being built with or around
//! it.
//!
//! ```text
//! ╭───────────────╮
//! │ Job │
//! │ (impl Task) │
//! ╰───────────────╯
//! │
//! ▼
//! ╭───────────────╮
//! ┏━━│ Queue │◀━┓
//! ┃ ╰───────────────╯ ┃
//! ╭───────────────╮ ┃ ◊ ┃ ╭───────────────╮
//! │ Worker │◀━┩ │ ┡━━│ Scheduler │
//! ╰───────────────╯ │ ╭───────────────╮ │ ╰───────────────╯
//! └─▶│ Task │◀─┘
//! ╰───────────────╯
//! ```
//!
//! These components are designed to promote clear [separation of
//! concerns][SoC], with each having a well-defined purpose and clear boundary
//! in relationship to the other components.
//!
//! For example, queues manage task life cycle, encapsulating state transitions
//! and persisiting the task's canonical state in the database. Whereas workers
//! and schedulers interface with the queue to process tasks or enqueue tasks
//! for execution, respectively.
//!
//! At the uppermost layer, jobs are built on top of this subsystem, and are an
//! implementation of the `Task` trait. Put another way, the lower-level system
//! is unawre of the concept of a "job" and treats it like any other task.
//!
//! [SoC]: https://en.wikipedia.org/wiki/Separation_of_concerns
use ;
pub use crate::;
static MIGRATOR: Migrator = migrate!;
/// Runs Underway migrations.
///
/// These migrations must be applied before queues, tasks, and workers can be
/// run.
///
/// A transaction is acquired via the provided connection and migrations are run
/// via this transaction.
///
/// As there is no direct support for specifying the schema under which the
/// migrations table will live, we manually specify this via the search path.
/// This ensures that migrations are isolated to underway._sqlx_migrations.
///
/// **Note**: Changes are managed within a dedicated schema, called "underway".
///
/// # Example
///
///```rust,no_run
/// # use tokio::runtime::Runtime;
/// use std::env;
///
/// use sqlx::PgPool;
///
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
/// // Set up the database connection pool.
/// let database_url = &env::var("DATABASE_URL")?;
/// let pool = PgPool::connect(database_url).await?;
///
/// // Run migrations.
/// underway::run_migrations(&pool).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
pub async