apalis_core/task/mod.rs
1//! Utilities for creating and managing tasks.
2//!
3//! The [`Task`] component encapsulates a unit of work to be executed,
4//! along with its associated context, metadata, and execution status. The [`Parts`]
5//! struct contains metadata, attempt tracking, extensions, and scheduling information for each task.
6//!
7//! # Overview
8//!
9//! In `apalis`, tasks are designed to represent discrete units of work that can be scheduled, retried, and tracked
10//! throughout their lifecycle. Each task consists of arguments (`args`) describing the work to be performed,
11//! and an [`Parts`] (`parts`) containing metadata and control information.
12//!
13//! ## [`Task`]
14//!
15//! The [`Task`] struct is generic over:
16//! - `Args`: The type of arguments or payload for the task.
17//! - `Ctx`: Ctxdata associated with the task, such as custom fields or backend-specific information.
18//! - `IdType`: The type used for uniquely identifying the task (defaults to [`RandomId`]).
19//!
20//! ## [`Parts`]
21//!
22//! The [`Parts`] struct provides the following:
23//! - `task_id`: Optionally stores a unique identifier for the task.
24//! - `data`: An [`Extensions`] container for storing arbitrary per-task data (e.g., middleware extensions).
25//! - `attempt`: Tracks how many times the task has been attempted.
26//! - `metadata`: Custom metadata for the task, provided by the backend or user.
27//! - `status`: The current [`Status`] of the task (e.g., Pending, Running, Completed, Failed).
28//! - `run_at`: The UNIX timestamp (in seconds) when the task should be run.
29//!
30//! The execution context is essential for tracking the state and metadata of a task as it moves through
31//! the system. It enables features such as retries, scheduling, and extensibility via the `Extensions` type.
32//!
33//! # Modules
34//!
35//! - [`attempt`]: Tracks the number of attempts a task has been executed.
36//! - [`builder`]: Utilities for constructing tasks.
37//! - [`data`]: Data types for task payloads.
38//! - [`extensions`]: Extension storage for tasks.
39//! - [`metadata`]: Ctxdata types for tasks.
40//! - [`status`]: Status tracking for tasks.
41//! - [`task_id`]: Types for uniquely identifying tasks.
42//!
43//! # Examples
44//!
45//! ## Creating a new task with default metadata
46//!
47//! ```rust
48//! # use apalis_core::task::{Task, Parts};
49//! # use apalis_core::task::builder::TaskBuilder;
50//! let task: Task<String, ()> = TaskBuilder::new("my work".to_string()).build();
51//! ```
52//!
53//! ## Creating a task with custom metadata
54//!
55//! ```rust
56//! # use apalis_core::task::{Task, Parts};
57//! # use apalis_core::task::builder::TaskBuilder;
58//! # use apalis_core::task::extensions::Extensions;
59//!
60//! #[derive(Default, Clone)]
61//! struct MyCtx { priority: u8 }
62//! let task: Task<String, Extensions> = TaskBuilder::new("important work".to_string())
63//! .meta(MyCtx { priority: 5 })
64//! .build();
65//! ```
66//!
67//! ## Accessing and modifying the execution context
68//!
69//! ```rust
70//! use apalis_core::task::{Task, Parts, status::Status};
71//! let mut task = Task::<String, ()>::new("work".to_string());
72//! task.parts.status = Status::Running.into();
73//! task.parts.attempt.increment();
74//! ```
75//!
76//! ## Using Extensions for per-task data
77//!
78//! ```rust
79//! # use apalis_core::task::builder::TaskBuilder;
80//! use apalis_core::task::{Task, extensions::Extensions};
81//! #[derive(Debug, Clone, PartialEq)]
82//! pub struct TracingId(String);
83//! let mut extensions = Extensions::default();
84//! extensions.insert(TracingId("abc123".to_owned()));
85//! let task: Task<String, ()> = TaskBuilder::new("work".to_string()).with_data(extensions).build();
86//! assert_eq!(task.parts.data.get::<TracingId>(), Some(&TracingId("abc123".to_owned())));
87//! ```
88//!
89//! # See Also
90//!
91//! - [`Task`]: Represents a unit of work to be executed.
92//! - [`Parts`]: Holds metadata, status, and control information for a task.
93//! - [`Extensions`]: Type-safe storage for per-task data.
94//! - [`Status`]: Enum representing the lifecycle state of a task.
95//! - [`Attempt`]: Tracks the number of execution attempts for a task.
96//! - [`TaskId`]: Unique identifier type for tasks.
97//! - [`FromRequest`]: Trait for extracting data from task contexts.
98//! - [`IntoResponse`]: Trait for converting tasks into response types.
99//! - [`TaskBuilder`]: Fluent builder for constructing tasks with optional configuration.
100//! - [`RandomId`]: Default unique identifier type for tasks.
101//!
102//! [`TaskBuilder`]: crate::task::builder::TaskBuilder
103//! [`IntoResponse`]: crate::task_fn::into_response::IntoResponse
104//! [`FromRequest`]: crate::task_fn::from_request::FromRequest
105
106use std::{
107 fmt::Debug,
108 time::{SystemTime, UNIX_EPOCH},
109};
110
111use crate::{
112 backend::queue::Queue,
113 task::{
114 attempt::Attempt,
115 builder::TaskBuilder,
116 extensions::Extensions,
117 status::{AtomicStatus, Status},
118 task_id::{RandomId, TaskId},
119 },
120 task_fn::FromRequest,
121};
122
123pub mod attempt;
124pub mod builder;
125pub mod data;
126pub mod extensions;
127pub mod metadata;
128pub mod status;
129pub mod task_id;
130
131/// Represents a task which will be executed
132/// Should be considered a single unit of work
133#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
134#[derive(Debug, Clone, Default)]
135pub struct Task<Args, Context, IdType = RandomId> {
136 /// The argument task part
137 pub args: Args,
138 /// Parts of the task eg id, attempts and context
139 pub parts: Parts<Context, IdType>,
140}
141
142/// Component parts of a `Task`
143#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
144#[derive(Default)]
145pub struct Parts<Context, IdType = RandomId> {
146 /// The task's id if allocated
147 pub task_id: Option<TaskId<IdType>>,
148
149 /// The tasks's extensions
150 #[cfg_attr(feature = "serde", serde(skip))]
151 pub data: Extensions,
152
153 /// The tasks's attempts
154 /// Keeps track of the number of attempts a task has been worked on
155 pub attempt: Attempt,
156
157 /// The task specific data provided by the backend
158 pub ctx: Context,
159
160 /// The task status that is wrapped in an atomic status
161 pub status: AtomicStatus,
162
163 /// The time a task should be run
164 pub run_at: u64,
165
166 /// The queue the task belongs to
167 /// This is a runtime only field and is not serialized
168 /// It is set by the backend when the task is fetched from the queue
169 /// Workers can use this to determine which queue they are processing tasks from
170 /// Sinks can also use this to determine which queue to send the task to
171 /// This field is optional as not all backends support queues
172 /// For example, a simple in-memory backend does not have the concept of queues
173 #[cfg_attr(feature = "serde", serde(skip))]
174 pub queue: Option<Queue>,
175}
176
177impl<Ctx: Debug, IdType: Debug> Debug for Parts<Ctx, IdType> {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("Parts")
180 .field("task_id", &self.task_id)
181 .field("data", &"<Extensions>")
182 .field("attempt", &self.attempt)
183 .field("ctx", &self.ctx)
184 .field("status", &self.status.load())
185 .field("run_at", &self.run_at)
186 .finish()
187 }
188}
189
190impl<Ctx, IdType: Clone> Clone for Parts<Ctx, IdType>
191where
192 Ctx: Clone,
193{
194 fn clone(&self) -> Self {
195 Self {
196 task_id: self.task_id.clone(),
197 data: self.data.clone(),
198 attempt: self.attempt.clone(),
199 ctx: self.ctx.clone(),
200 status: self.status.clone(),
201 run_at: self.run_at,
202 queue: self.queue.clone(),
203 }
204 }
205}
206
207impl<Args, Ctx, IdType> Task<Args, Ctx, IdType> {
208 /// Creates a new [Task]
209 pub fn new(args: Args) -> Self
210 where
211 Ctx: Default,
212 {
213 Self::new_with_data(args, Extensions::default(), Ctx::default())
214 }
215
216 /// Creates a task with context provided
217 pub fn new_with_ctx(req: Args, ctx: Ctx) -> Self {
218 Self {
219 args: req,
220 parts: Parts {
221 ctx,
222 task_id: Default::default(),
223 attempt: Default::default(),
224 data: Default::default(),
225 status: Status::Pending.into(),
226 run_at: {
227 let now = SystemTime::now();
228 let duration_since_epoch =
229 now.duration_since(UNIX_EPOCH).expect("Time went backwards");
230 duration_since_epoch.as_secs()
231 },
232 queue: None,
233 },
234 }
235 }
236
237 /// Creates a task with data and context provided
238 pub fn new_with_data(req: Args, data: Extensions, ctx: Ctx) -> Self {
239 Self {
240 args: req,
241 parts: Parts {
242 ctx,
243 task_id: Default::default(),
244 attempt: Default::default(),
245 data,
246 status: Status::Pending.into(),
247 run_at: {
248 let now = SystemTime::now();
249 let duration_since_epoch =
250 now.duration_since(UNIX_EPOCH).expect("Time went backwards");
251 duration_since_epoch.as_secs()
252 },
253 queue: None,
254 },
255 }
256 }
257
258 /// Take the task into its parts
259 pub fn take(self) -> (Args, Parts<Ctx, IdType>) {
260 (self.args, self.parts)
261 }
262
263 /// Extract a value of type `T` from the task's context
264 ///
265 /// Uses [FromRequest] trait to extract the value.
266 pub async fn extract<T: FromRequest<Self>>(&self) -> Result<T, T::Error> {
267 T::from_request(self).await
268 }
269
270 /// Converts the task into a builder pattern.
271 pub fn into_builder(self) -> TaskBuilder<Args, Ctx, IdType> {
272 TaskBuilder {
273 args: self.args,
274 ctx: self.parts.ctx,
275 attempt: Some(self.parts.attempt),
276 data: self.parts.data,
277 status: Some(self.parts.status.into()),
278 run_at: Some(self.parts.run_at),
279 task_id: self.parts.task_id,
280 queue: self.parts.queue,
281 }
282 }
283}
284
285impl<Args, Ctx, IdType> Task<Args, Ctx, IdType> {
286 /// Maps the `args` field using the provided function, consuming the task.
287 pub fn try_map<F, NewArgs, Err>(self, f: F) -> Result<Task<NewArgs, Ctx, IdType>, Err>
288 where
289 F: FnOnce(Args) -> Result<NewArgs, Err>,
290 {
291 Ok(Task {
292 args: f(self.args)?,
293 parts: self.parts,
294 })
295 }
296 /// Maps the `args` field using the provided function, consuming the task.
297 pub fn map<F, NewArgs>(self, f: F) -> Task<NewArgs, Ctx, IdType>
298 where
299 F: FnOnce(Args) -> NewArgs,
300 {
301 Task {
302 args: f(self.args),
303 parts: self.parts,
304 }
305 }
306
307 /// Maps both `args` and `parts` together.
308 pub fn map_all<F, NewArgs, NewCtx>(self, f: F) -> Task<NewArgs, NewCtx, IdType>
309 where
310 F: FnOnce(Args, Parts<Ctx, IdType>) -> (NewArgs, Parts<NewCtx, IdType>),
311 {
312 let (args, parts) = f(self.args, self.parts);
313 Task { args, parts: parts }
314 }
315
316 /// Maps only the `parts` field.
317 pub fn map_parts<F, NewCtx>(self, f: F) -> Task<Args, NewCtx, IdType>
318 where
319 F: FnOnce(Parts<Ctx, IdType>) -> Parts<NewCtx, IdType>,
320 {
321 Task {
322 args: self.args,
323 parts: f(self.parts),
324 }
325 }
326}