Skip to main content

apalis_core/task/
mod.rs

1//! Utilities for creating and managing tasks.
2//!
3//! The [`Task`] component encapsulates a unit of work to be executed,
4//! along with its associated context, metadata, and execution status. The [`Parts`]
5//! struct contains metadata, attempt tracking, extensions, and scheduling information for each task.
6//!
7//! # Overview
8//!
9//! In `apalis`, tasks are designed to represent discrete units of work that can be scheduled, retried, and tracked
10//! throughout their lifecycle. Each task consists of arguments (`args`) describing the work to be performed,
11//! and an [`Parts`] (`parts`) containing metadata and control information.
12//!
13//! ## [`Task`]
14//!
15//! The [`Task`] struct is generic over:
16//! - `Args`: The type of arguments or payload for the task.
17//! - `Ctx`: Ctxdata associated with the task, such as custom fields or backend-specific information.
18//! - `IdType`: The type used for uniquely identifying the task (defaults to [`RandomId`]).
19//!
20//! ## [`Parts`]
21//!
22//! The [`Parts`] struct provides the following:
23//! - `task_id`: Optionally stores a unique identifier for the task.
24//! - `data`: An [`Extensions`] container for storing arbitrary per-task data (e.g., middleware extensions).
25//! - `attempt`: Tracks how many times the task has been attempted.
26//! - `metadata`: Custom metadata for the task, provided by the backend or user.
27//! - `status`: The current [`Status`] of the task (e.g., Pending, Running, Completed, Failed).
28//! - `run_at`: The UNIX timestamp (in seconds) when the task should be run.
29//!
30//! The execution context is essential for tracking the state and metadata of a task as it moves through
31//! the system. It enables features such as retries, scheduling, and extensibility via the `Extensions` type.
32//!
33//! # Modules
34//!
35//! - [`attempt`]: Tracks the number of attempts a task has been executed.
36//! - [`builder`]: Utilities for constructing tasks.
37//! - [`data`]: Data types for task payloads.
38//! - [`extensions`]: Extension storage for tasks.
39//! - [`metadata`]: Ctxdata types for tasks.
40//! - [`status`]: Status tracking for tasks.
41//! - [`task_id`]: Types for uniquely identifying tasks.
42//!
43//! # Examples
44//!
45//! ## Creating a new task with default metadata
46//!
47//! ```rust
48//! # use apalis_core::task::{Task, Parts};
49//! # use apalis_core::task::builder::TaskBuilder;
50//! # use apalis_core::task::task_id::RandomId;
51//! let task: Task<String, (), RandomId> = TaskBuilder::new("my work".to_string()).build();
52//! ```
53//!
54//! ## Creating a task with custom metadata
55//!
56//! ```rust
57//! # use apalis_core::task::{Task, Parts};
58//! # use apalis_core::task::builder::TaskBuilder;
59//! # use apalis_core::task::extensions::Extensions;
60//! # use apalis_core::task::task_id::RandomId;
61//!
62//! #[derive(Default, Clone)]
63//! struct MyCtx { priority: u8 }
64//! let task: Task<String, Extensions, RandomId> = TaskBuilder::new("important work".to_string())
65//!     .meta(MyCtx { priority: 5 })
66//!     .build();
67//! ```
68//!
69//! ## Accessing and modifying the execution context
70//!
71//! ```rust
72//! # use apalis_core::task::task_id::RandomId;
73//! use apalis_core::task::{Task, Parts, status::Status};
74//! let mut task = Task::<String, (), RandomId>::new("work".to_string());
75//! task.parts.status = Status::Running.into();
76//! task.parts.attempt.increment();
77//! ```
78//!
79//! ## Using Extensions for per-task data
80//!
81//! ```rust
82//! # use apalis_core::task::builder::TaskBuilder;
83//! # use apalis_core::task::task_id::RandomId;
84//! use apalis_core::task::{Task, extensions::Extensions};
85//! #[derive(Debug, Clone, PartialEq)]
86//! pub struct TracingId(String);
87//! let mut extensions = Extensions::default();
88//! extensions.insert(TracingId("abc123".to_owned()));
89//! let task: Task<String, (), RandomId> = TaskBuilder::new("work".to_string()).with_data(extensions).build();
90//! assert_eq!(task.parts.data.get::<TracingId>(), Some(&TracingId("abc123".to_owned())));
91//! ```
92//!
93//! # See Also
94//!
95//! - [`Task`]: Represents a unit of work to be executed.
96//! - [`Parts`]: Holds metadata, status, and control information for a task.
97//! - [`Extensions`]: Type-safe storage for per-task data.
98//! - [`Status`]: Enum representing the lifecycle state of a task.
99//! - [`Attempt`]: Tracks the number of execution attempts for a task.
100//! - [`TaskId`]: Unique identifier type for tasks.
101//! - [`FromRequest`]: Trait for extracting data from task contexts.
102//! - [`IntoResponse`]: Trait for converting tasks into response types.
103//! - [`TaskBuilder`]: Fluent builder for constructing tasks with optional configuration.
104//! - [`RandomId`]: Default unique identifier type for tasks.
105//!
106//! [`TaskBuilder`]: crate::task::builder::TaskBuilder
107//! [`IntoResponse`]: crate::task_fn::into_response::IntoResponse
108//! [`FromRequest`]: crate::task_fn::from_request::FromRequest
109
110use std::{
111    fmt::Debug,
112    time::{SystemTime, UNIX_EPOCH},
113};
114
115use crate::{
116    task::{
117        attempt::Attempt,
118        builder::TaskBuilder,
119        extensions::Extensions,
120        status::{AtomicStatus, Status},
121        task_id::TaskId,
122    },
123    task_fn::FromRequest,
124};
125
126pub mod attempt;
127pub mod builder;
128pub mod data;
129pub mod extensions;
130pub mod metadata;
131pub mod status;
132pub mod task_id;
133
134/// Represents a task which will be executed
135/// Should be considered a single unit of work
136#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
137#[derive(Debug, Clone, Default)]
138pub struct Task<Args, Context, IdType> {
139    /// The argument task part
140    pub args: Args,
141    /// Parts of the task eg id, attempts and context
142    pub parts: Parts<Context, IdType>,
143}
144
145/// Component parts of a `Task`
146#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
147#[derive(Default)]
148pub struct Parts<Context, IdType> {
149    /// The task's id if allocated
150    pub task_id: Option<TaskId<IdType>>,
151
152    /// The tasks's extensions
153    #[cfg_attr(feature = "serde", serde(skip))]
154    pub data: Extensions,
155
156    /// The tasks's attempts
157    /// Keeps track of the number of attempts a task has been worked on
158    pub attempt: Attempt,
159
160    /// The task specific data provided by the backend
161    pub ctx: Context,
162
163    /// The task status that is wrapped in an atomic status
164    pub status: AtomicStatus,
165
166    /// The time a task should be run
167    pub run_at: u64,
168
169    /// Adds a unique key to enforce job uniqueness when used
170    pub idempotency_key: Option<String>,
171}
172
173impl<Ctx: Debug, IdType: Debug> Debug for Parts<Ctx, IdType> {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("Parts")
176            .field("task_id", &self.task_id)
177            .field("data", &"<Extensions>")
178            .field("attempt", &self.attempt)
179            .field("ctx", &self.ctx)
180            .field("status", &self.status.load())
181            .field("run_at", &self.run_at)
182            .field("idempotency_key", &self.idempotency_key)
183            .finish()
184    }
185}
186
187impl<Ctx, IdType: Clone> Clone for Parts<Ctx, IdType>
188where
189    Ctx: Clone,
190{
191    fn clone(&self) -> Self {
192        Self {
193            task_id: self.task_id.clone(),
194            data: self.data.clone(),
195            attempt: self.attempt.clone(),
196            ctx: self.ctx.clone(),
197            status: self.status.clone(),
198            run_at: self.run_at,
199            idempotency_key: self.idempotency_key.clone(),
200        }
201    }
202}
203
204impl<Args, Ctx, IdType> Task<Args, Ctx, IdType> {
205    /// Creates a new [Task]
206    pub fn new(args: Args) -> Self
207    where
208        Ctx: Default,
209    {
210        Self::new_with_data(args, Extensions::default(), Ctx::default())
211    }
212
213    /// Creates a task with context provided
214    pub fn new_with_ctx(req: Args, ctx: Ctx) -> Self {
215        Self {
216            args: req,
217            parts: Parts {
218                ctx,
219                task_id: Default::default(),
220                attempt: Default::default(),
221                data: Default::default(),
222                status: Status::Pending.into(),
223                run_at: {
224                    let now = SystemTime::now();
225                    let duration_since_epoch =
226                        now.duration_since(UNIX_EPOCH).expect("Time went backwards");
227                    duration_since_epoch.as_secs()
228                },
229                idempotency_key: Default::default(),
230            },
231        }
232    }
233
234    /// Creates a task with data and context provided
235    pub fn new_with_data(req: Args, data: Extensions, ctx: Ctx) -> Self {
236        Self {
237            args: req,
238            parts: Parts {
239                ctx,
240                task_id: Default::default(),
241                attempt: Default::default(),
242                data,
243                status: Status::Pending.into(),
244                run_at: {
245                    let now = SystemTime::now();
246                    let duration_since_epoch =
247                        now.duration_since(UNIX_EPOCH).expect("Time went backwards");
248                    duration_since_epoch.as_secs()
249                },
250                idempotency_key: Default::default(),
251            },
252        }
253    }
254
255    /// Take the task into its parts
256    pub fn take(self) -> (Args, Parts<Ctx, IdType>) {
257        (self.args, self.parts)
258    }
259
260    /// Extract a value of type `T` from the task's context
261    ///
262    /// Uses [FromRequest] trait to extract the value.
263    pub async fn extract<T: FromRequest<Self>>(&self) -> Result<T, T::Error> {
264        T::from_request(self).await
265    }
266
267    /// Converts the task into a builder pattern.
268    pub fn into_builder(self) -> TaskBuilder<Args, Ctx, IdType> {
269        TaskBuilder {
270            args: self.args,
271            ctx: self.parts.ctx,
272            attempt: Some(self.parts.attempt),
273            data: self.parts.data,
274            status: Some(self.parts.status.into()),
275            run_at: Some(self.parts.run_at),
276            task_id: self.parts.task_id,
277            idempotency_key: self.parts.idempotency_key,
278        }
279    }
280}
281
282impl<Args, Ctx, IdType> Task<Args, Ctx, IdType> {
283    /// Maps the `args` field using the provided function, consuming the task.
284    pub fn try_map<F, NewArgs, Err>(self, f: F) -> Result<Task<NewArgs, Ctx, IdType>, Err>
285    where
286        F: FnOnce(Args) -> Result<NewArgs, Err>,
287    {
288        Ok(Task {
289            args: f(self.args)?,
290            parts: self.parts,
291        })
292    }
293    /// Maps the `args` field using the provided function, consuming the task.
294    pub fn map<F, NewArgs>(self, f: F) -> Task<NewArgs, Ctx, IdType>
295    where
296        F: FnOnce(Args) -> NewArgs,
297    {
298        Task {
299            args: f(self.args),
300            parts: self.parts,
301        }
302    }
303
304    /// Maps both `args` and `parts` together.
305    pub fn map_all<F, NewArgs, NewCtx>(self, f: F) -> Task<NewArgs, NewCtx, IdType>
306    where
307        F: FnOnce(Args, Parts<Ctx, IdType>) -> (NewArgs, Parts<NewCtx, IdType>),
308    {
309        let (args, parts) = f(self.args, self.parts);
310        Task { args, parts }
311    }
312
313    /// Maps only the `parts` field.
314    pub fn map_parts<F, NewCtx>(self, f: F) -> Task<Args, NewCtx, IdType>
315    where
316        F: FnOnce(Parts<Ctx, IdType>) -> Parts<NewCtx, IdType>,
317    {
318        Task {
319            args: self.args,
320            parts: f(self.parts),
321        }
322    }
323}