apalis_sql/
context.rs

1use std::convert::Infallible;
2
3use apalis_core::{
4    task::{Task, metadata::MetadataExt},
5    task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11    Deserialize, Serialize,
12    de::{DeserializeOwned, Error},
13};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SqlContext {
17    max_attempts: i32,
18    last_result: Option<serde_json::Value>,
19    lock_at: Option<i64>,
20    lock_by: Option<String>,
21    done_at: Option<i64>,
22    priority: i32,
23    queue: Option<String>,
24    meta: JsonMapMetadata,
25}
26
27impl Default for SqlContext {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl SqlContext {
34    /// Build a new context with defaults
35    pub fn new() -> Self {
36        SqlContext {
37            lock_at: None,
38            done_at: None,
39            max_attempts: 5,
40            last_result: None,
41            lock_by: None,
42            priority: 0,
43            queue: None,
44            meta: Default::default(),
45        }
46    }
47
48    /// Set the number of attempts
49    pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
50        self.max_attempts = max_attempts;
51        self
52    }
53
54    /// Gets the maximum attempts for a job. Default 25
55    pub fn max_attempts(&self) -> i32 {
56        self.max_attempts
57    }
58
59    /// Get the time a job was done
60    pub fn done_at(&self) -> &Option<i64> {
61        &self.done_at
62    }
63
64    /// Set the time a job was done
65    pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
66        self.done_at = done_at;
67        self
68    }
69
70    /// Get the time a job was locked
71    pub fn lock_at(&self) -> &Option<i64> {
72        &self.lock_at
73    }
74
75    /// Set the lock_at value
76    pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
77        self.lock_at = lock_at;
78        self
79    }
80
81    /// Get the time a job was locked
82    pub fn lock_by(&self) -> &Option<String> {
83        &self.lock_by
84    }
85
86    /// Set `lock_by`
87    pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
88        self.lock_by = lock_by;
89        self
90    }
91
92    /// Get the time a job was locked
93    pub fn last_result(&self) -> &Option<serde_json::Value> {
94        &self.last_result
95    }
96
97    /// Set the last result
98    pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
99        self.last_result = result;
100        self
101    }
102
103    /// Set the job priority. Larger values will run sooner. Default is 0.
104    pub fn with_priority(mut self, priority: i32) -> Self {
105        self.priority = priority;
106        self
107    }
108
109    /// Get the job priority
110    pub fn priority(&self) -> i32 {
111        self.priority
112    }
113
114    pub fn queue(&self) -> &Option<String> {
115        &self.queue
116    }
117
118    pub fn with_queue(mut self, queue: String) -> Self {
119        self.queue = Some(queue);
120        self
121    }
122
123    pub fn meta(&self) -> &JsonMapMetadata {
124        &self.meta
125    }
126
127    pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
128        self.meta = meta;
129        self
130    }
131}
132
133impl<Args: Sync, IdType: Sync> FromRequest<Task<Args, Self, IdType>> for SqlContext {
134    type Error = Infallible;
135    async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
136        Ok(req.parts.ctx.clone())
137    }
138}
139
140impl<T: DeserializeOwned + Serialize> MetadataExt<T> for SqlContext {
141    type Error = serde_json::Error;
142    fn extract(&self) -> Result<T, Self::Error> {
143        self.meta
144            .get(std::any::type_name::<T>())
145            .and_then(|v| T::deserialize(v).ok())
146            .ok_or(serde_json::Error::custom("Failed to extract metadata"))
147    }
148    fn inject(&mut self, value: T) -> Result<(), Self::Error> {
149        self.meta.insert(
150            std::any::type_name::<T>().to_string(),
151            serde_json::to_value(&value).unwrap(),
152        );
153        Ok(())
154    }
155}