apalis_sql/
context.rs

1use std::convert::Infallible;
2
3use apalis_core::{
4    task::{Task, metadata::MetadataExt},
5    task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11    Deserialize, Serialize,
12    de::{DeserializeOwned, Error},
13};
14
15/// The SQL context used for jobs stored in a SQL database
16#[derive(Debug, Serialize, Deserialize)]
17pub struct SqlContext<Pool> {
18    max_attempts: i32,
19    last_result: Option<serde_json::Value>,
20    lock_at: Option<i64>,
21    lock_by: Option<String>,
22    done_at: Option<i64>,
23    priority: i32,
24    queue: Option<String>,
25    meta: JsonMapMetadata,
26    // Marker to hold the Pool type
27    // Used to associate the context with a specific database pool type
28    _pool: std::marker::PhantomData<Pool>,
29}
30
31impl<Pool> Clone for SqlContext<Pool> {
32    fn clone(&self) -> Self {
33        Self {
34            lock_at: self.lock_at,
35            done_at: self.done_at,
36            max_attempts: self.max_attempts,
37            last_result: self.last_result.clone(),
38            lock_by: self.lock_by.clone(),
39            priority: self.priority,
40            queue: self.queue.clone(),
41            meta: self.meta.clone(),
42            _pool: std::marker::PhantomData,
43        }
44    }
45}
46
47impl<Pool> Default for SqlContext<Pool> {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl<Pool> SqlContext<Pool> {
54    /// Build a new context with defaults
55    #[must_use]
56    pub fn new() -> Self {
57        Self {
58            lock_at: None,
59            done_at: None,
60            max_attempts: 5,
61            last_result: None,
62            lock_by: None,
63            priority: 0,
64            queue: None,
65            meta: Default::default(),
66            _pool: std::marker::PhantomData,
67        }
68    }
69
70    /// Set the number of attempts
71    #[must_use]
72    pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
73        self.max_attempts = max_attempts;
74        self
75    }
76
77    /// Gets the maximum attempts for a job. Default 25
78    #[must_use]
79    pub fn max_attempts(&self) -> i32 {
80        self.max_attempts
81    }
82
83    /// Get the time a job was done
84    #[must_use]
85    pub fn done_at(&self) -> &Option<i64> {
86        &self.done_at
87    }
88
89    /// Set the time a job was done
90    #[must_use]
91    pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
92        self.done_at = done_at;
93        self
94    }
95
96    /// Get the time a job was locked
97    #[must_use]
98    pub fn lock_at(&self) -> &Option<i64> {
99        &self.lock_at
100    }
101
102    /// Set the lock_at value
103    #[must_use]
104    pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
105        self.lock_at = lock_at;
106        self
107    }
108
109    /// Get the time a job was locked
110    #[must_use]
111    pub fn lock_by(&self) -> &Option<String> {
112        &self.lock_by
113    }
114
115    /// Set `lock_by`
116    #[must_use]
117    pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
118        self.lock_by = lock_by;
119        self
120    }
121
122    /// Get the time a job was locked
123    #[must_use]
124    pub fn last_result(&self) -> &Option<serde_json::Value> {
125        &self.last_result
126    }
127
128    /// Set the last result
129    #[must_use]
130    pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
131        self.last_result = result;
132        self
133    }
134
135    /// Set the job priority. Larger values will run sooner. Default is 0.
136    #[must_use]
137    pub fn with_priority(mut self, priority: i32) -> Self {
138        self.priority = priority;
139        self
140    }
141
142    /// Get the job priority
143    #[must_use]
144    pub fn priority(&self) -> i32 {
145        self.priority
146    }
147
148    /// Get the queue name
149    #[must_use]
150    pub fn queue(&self) -> &Option<String> {
151        &self.queue
152    }
153
154    /// Set the queue name
155    #[must_use]
156    pub fn with_queue(mut self, queue: String) -> Self {
157        self.queue = Some(queue);
158        self
159    }
160
161    /// Get the metadata map
162    #[must_use]
163    pub fn meta(&self) -> &JsonMapMetadata {
164        &self.meta
165    }
166
167    /// Set the metadata map
168    #[must_use]
169    pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
170        self.meta = meta;
171        self
172    }
173}
174
175impl<Args: Sync, IdType: Sync, Pool: Sync> FromRequest<Task<Args, Self, IdType>>
176    for SqlContext<Pool>
177{
178    type Error = Infallible;
179    async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
180        Ok(req.parts.ctx.clone())
181    }
182}
183
184impl<T: DeserializeOwned + Serialize, Pool> MetadataExt<T> for SqlContext<Pool> {
185    type Error = serde_json::Error;
186    fn extract(&self) -> Result<T, Self::Error> {
187        self.meta
188            .get(std::any::type_name::<T>())
189            .and_then(|v| T::deserialize(v).ok())
190            .ok_or(serde_json::Error::custom("Failed to extract metadata"))
191    }
192    fn inject(&mut self, value: T) -> Result<(), Self::Error> {
193        self.meta.insert(
194            std::any::type_name::<T>().to_owned(),
195            serde_json::to_value(&value).unwrap(),
196        );
197        Ok(())
198    }
199}