apalis_core/task/
data.rs

1//! Utilities for task runtime data extension.
2//!
3//! The [`Data`] type and related middleware are important for sharing state across tasks or layers within the same task.
4//!
5//! # Overview
6//!
7//! - [`Data<T>`]: Wraps a value of type `T` for sharing across tasks.
8//! - [`AddExtension<S, T>`]: Middleware for injecting shared data into a task's context.
9//! - [`MissingDataError`]: Error type for missing or unavailable data in a task context.
10//!
11//! # Usage
12//!
13//! Use [`Data`] to share application state (such as database connections, configuration, etc.) across tasks and layers. Apply this middleware using [`WorkerBuilder::data`](crate::worker::builder::WorkerBuilder::data).
14//!
15//! ## Example
16//!
17//! ```rust
18//! # use std::sync::Arc;
19//! # use apalis_core::task::data::Data;
20//! # use apalis_core::worker::builder::WorkerBuilder;
21//! # use apalis_core::backend::memory::MemoryStorage;
22//!
23//! struct State { /* ... */ }
24//!
25//! async fn email_service(email: String, state: Data<Arc<State>>) {
26//!     // Use shared state here
27//! }
28//!
29//! let state = Arc::new(State { /* ... */ });
30//! let worker = WorkerBuilder::new("tasty-avocado")
31//!     .backend(MemoryStorage::new())
32//!     .data(state)
33//!     .build(email_service);
34//! ```
35//!
36//! # Features
37//!
38//! - Type-safe access to shared data.
39//! - Integrated as middleware.
40//! - Error handling for missing data.
41//!
42//! # See Also
43//!
44//! - [`FromRequest`] trait for extracting data from task contexts.
45//! - [`Task`] type representing a unit of work.
46use std::{
47    ops::Deref,
48    task::{Context, Poll},
49};
50
51use tower_service::Service;
52
53use crate::{task::Task, task_fn::FromRequest};
54
55/// Extension data for tasks.
56/// This is commonly used to share state across tasks. or across layers within the same tasks
57///
58/// ```rust
59/// # use std::sync::Arc;
60/// # use apalis_core::task::data::Data;
61/// # use apalis_core::worker::builder::WorkerBuilder;
62/// # use apalis_core::backend::memory::MemoryStorage;
63/// // Some shared state used throughout our application
64/// struct State {
65///     // ...
66/// }
67///
68/// async fn send_email(email: String, state: Data<Arc<State>>) {
69///     
70/// }
71///
72/// let state = Arc::new(State { /* ... */ });
73/// let backend = MemoryStorage::new();
74///
75/// let worker = WorkerBuilder::new("tasty-avocado")
76///     .backend(backend)
77///     .data(state)
78///     .build(send_email);
79/// ```
80
81#[derive(Debug, Clone, Copy)]
82pub struct Data<T>(T);
83impl<T> Data<T> {
84    /// Build a new data entry
85    pub fn new(inner: T) -> Data<T> {
86        Data(inner)
87    }
88}
89
90impl<T> Deref for Data<T> {
91    type Target = T;
92    fn deref(&self) -> &Self::Target {
93        &self.0
94    }
95}
96
97impl<S, T> tower_layer::Layer<S> for Data<T>
98where
99    T: Clone + Send + Sync + 'static,
100{
101    type Service = AddExtension<S, T>;
102
103    fn layer(&self, inner: S) -> Self::Service {
104        AddExtension {
105            inner,
106            value: self.0.clone(),
107        }
108    }
109}
110
111/// Middleware for adding some shareable value to [request data].
112#[derive(Clone, Copy, Debug)]
113pub struct AddExtension<S, T> {
114    inner: S,
115    value: T,
116}
117
118impl<S, T, Args, Ctx, IdType> Service<Task<Args, Ctx, IdType>> for AddExtension<S, T>
119where
120    S: Service<Task<Args, Ctx, IdType>>,
121    T: Clone + Send + Sync + 'static,
122{
123    type Response = S::Response;
124    type Error = S::Error;
125    type Future = S::Future;
126
127    #[inline]
128    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129        self.inner.poll_ready(cx)
130    }
131
132    fn call(&mut self, mut task: Task<Args, Ctx, IdType>) -> Self::Future {
133        task.parts.data.insert(self.value.clone());
134        self.inner.call(task)
135    }
136}
137
138/// Error type for missing data in a task's context.
139#[derive(Debug, thiserror::Error, PartialEq)]
140pub enum MissingDataError {
141    /// The type was not found in the task's data map
142    #[error("the type for key `{0}` is not available")]
143    NotFound(String),
144}
145
146impl<T: Clone + Send + Sync + 'static, Args: Sync, Ctx: Sync, IdType: Sync + Send>
147    FromRequest<Task<Args, Ctx, IdType>> for Data<T>
148{
149    type Error = MissingDataError;
150    async fn from_request(task: &Task<Args, Ctx, IdType>) -> Result<Self, Self::Error> {
151        task.parts.data.get_checked().cloned().map(Data::new)
152    }
153}