apalis_sql/
context.rs

1use std::convert::Infallible;
2
3use apalis_core::{
4    task::{Task, metadata::MetadataExt},
5    task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11    Deserialize, Serialize,
12    de::{DeserializeOwned, Error},
13};
14
15/// The SQL context used for jobs stored in a SQL database
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SqlContext {
18    max_attempts: i32,
19    last_result: Option<serde_json::Value>,
20    lock_at: Option<i64>,
21    lock_by: Option<String>,
22    done_at: Option<i64>,
23    priority: i32,
24    queue: Option<String>,
25    meta: JsonMapMetadata,
26}
27
28impl Default for SqlContext {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl SqlContext {
35    /// Build a new context with defaults
36    #[must_use]
37    pub fn new() -> Self {
38        Self {
39            lock_at: None,
40            done_at: None,
41            max_attempts: 5,
42            last_result: None,
43            lock_by: None,
44            priority: 0,
45            queue: None,
46            meta: Default::default(),
47        }
48    }
49
50    /// Set the number of attempts
51    #[must_use]
52    pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
53        self.max_attempts = max_attempts;
54        self
55    }
56
57    /// Gets the maximum attempts for a job. Default 25
58    #[must_use]
59    pub fn max_attempts(&self) -> i32 {
60        self.max_attempts
61    }
62
63    /// Get the time a job was done
64    #[must_use]
65    pub fn done_at(&self) -> &Option<i64> {
66        &self.done_at
67    }
68
69    /// Set the time a job was done
70    #[must_use]
71    pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
72        self.done_at = done_at;
73        self
74    }
75
76    /// Get the time a job was locked
77    #[must_use]
78    pub fn lock_at(&self) -> &Option<i64> {
79        &self.lock_at
80    }
81
82    /// Set the lock_at value
83    #[must_use]
84    pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
85        self.lock_at = lock_at;
86        self
87    }
88
89    /// Get the time a job was locked
90    #[must_use]
91    pub fn lock_by(&self) -> &Option<String> {
92        &self.lock_by
93    }
94
95    /// Set `lock_by`
96    #[must_use]
97    pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
98        self.lock_by = lock_by;
99        self
100    }
101
102    /// Get the time a job was locked
103    #[must_use]
104    pub fn last_result(&self) -> &Option<serde_json::Value> {
105        &self.last_result
106    }
107
108    /// Set the last result
109    #[must_use]
110    pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
111        self.last_result = result;
112        self
113    }
114
115    /// Set the job priority. Larger values will run sooner. Default is 0.
116    #[must_use]
117    pub fn with_priority(mut self, priority: i32) -> Self {
118        self.priority = priority;
119        self
120    }
121
122    /// Get the job priority
123    #[must_use]
124    pub fn priority(&self) -> i32 {
125        self.priority
126    }
127
128    /// Get the queue name
129    #[must_use]
130    pub fn queue(&self) -> &Option<String> {
131        &self.queue
132    }
133
134    /// Set the queue name
135    #[must_use]
136    pub fn with_queue(mut self, queue: String) -> Self {
137        self.queue = Some(queue);
138        self
139    }
140
141    /// Get the metadata map
142    #[must_use]
143    pub fn meta(&self) -> &JsonMapMetadata {
144        &self.meta
145    }
146
147    /// Set the metadata map
148    #[must_use]
149    pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
150        self.meta = meta;
151        self
152    }
153}
154
155impl<Args: Sync, IdType: Sync> FromRequest<Task<Args, Self, IdType>> for SqlContext {
156    type Error = Infallible;
157    async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
158        Ok(req.parts.ctx.clone())
159    }
160}
161
162impl<T: DeserializeOwned + Serialize> MetadataExt<T> for SqlContext {
163    type Error = serde_json::Error;
164    fn extract(&self) -> Result<T, Self::Error> {
165        self.meta
166            .get(std::any::type_name::<T>())
167            .and_then(|v| T::deserialize(v).ok())
168            .ok_or(serde_json::Error::custom("Failed to extract metadata"))
169    }
170    fn inject(&mut self, value: T) -> Result<(), Self::Error> {
171        self.meta.insert(
172            std::any::type_name::<T>().to_owned(),
173            serde_json::to_value(&value).unwrap(),
174        );
175        Ok(())
176    }
177}