1use std::convert::Infallible;
2
3use apalis_core::{
4 task::{Task, metadata::MetadataExt},
5 task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11 Deserialize, Serialize,
12 de::{DeserializeOwned, Error},
13};
14
15#[derive(Debug, Serialize, Deserialize)]
17pub struct SqlContext<Pool> {
18 max_attempts: i32,
19 last_result: Option<serde_json::Value>,
20 lock_at: Option<i64>,
21 lock_by: Option<String>,
22 done_at: Option<i64>,
23 priority: i32,
24 queue: Option<String>,
25 meta: JsonMapMetadata,
26 _pool: std::marker::PhantomData<Pool>,
29}
30
31impl<Pool> Clone for SqlContext<Pool> {
32 fn clone(&self) -> Self {
33 Self {
34 lock_at: self.lock_at,
35 done_at: self.done_at,
36 max_attempts: self.max_attempts,
37 last_result: self.last_result.clone(),
38 lock_by: self.lock_by.clone(),
39 priority: self.priority,
40 queue: self.queue.clone(),
41 meta: self.meta.clone(),
42 _pool: std::marker::PhantomData,
43 }
44 }
45}
46
47impl<Pool> Default for SqlContext<Pool> {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl<Pool> SqlContext<Pool> {
54 #[must_use]
56 pub fn new() -> Self {
57 Self {
58 lock_at: None,
59 done_at: None,
60 max_attempts: 5,
61 last_result: None,
62 lock_by: None,
63 priority: 0,
64 queue: None,
65 meta: Default::default(),
66 _pool: std::marker::PhantomData,
67 }
68 }
69
70 #[must_use]
72 pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
73 self.max_attempts = max_attempts;
74 self
75 }
76
77 #[must_use]
79 pub fn max_attempts(&self) -> i32 {
80 self.max_attempts
81 }
82
83 #[must_use]
85 pub fn done_at(&self) -> &Option<i64> {
86 &self.done_at
87 }
88
89 #[must_use]
91 pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
92 self.done_at = done_at;
93 self
94 }
95
96 #[must_use]
98 pub fn lock_at(&self) -> &Option<i64> {
99 &self.lock_at
100 }
101
102 #[must_use]
104 pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
105 self.lock_at = lock_at;
106 self
107 }
108
109 #[must_use]
111 pub fn lock_by(&self) -> &Option<String> {
112 &self.lock_by
113 }
114
115 #[must_use]
117 pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
118 self.lock_by = lock_by;
119 self
120 }
121
122 #[must_use]
124 pub fn last_result(&self) -> &Option<serde_json::Value> {
125 &self.last_result
126 }
127
128 #[must_use]
130 pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
131 self.last_result = result;
132 self
133 }
134
135 #[must_use]
137 pub fn with_priority(mut self, priority: i32) -> Self {
138 self.priority = priority;
139 self
140 }
141
142 #[must_use]
144 pub fn priority(&self) -> i32 {
145 self.priority
146 }
147
148 #[must_use]
150 pub fn queue(&self) -> &Option<String> {
151 &self.queue
152 }
153
154 #[must_use]
156 pub fn with_queue(mut self, queue: String) -> Self {
157 self.queue = Some(queue);
158 self
159 }
160
161 #[must_use]
163 pub fn meta(&self) -> &JsonMapMetadata {
164 &self.meta
165 }
166
167 #[must_use]
169 pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
170 self.meta = meta;
171 self
172 }
173}
174
175impl<Args: Sync, IdType: Sync, Pool: Sync> FromRequest<Task<Args, Self, IdType>>
176 for SqlContext<Pool>
177{
178 type Error = Infallible;
179 async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
180 Ok(req.parts.ctx.clone())
181 }
182}
183
184impl<T: DeserializeOwned + Serialize, Pool> MetadataExt<T> for SqlContext<Pool> {
185 type Error = serde_json::Error;
186 fn extract(&self) -> Result<T, Self::Error> {
187 self.meta
188 .get(std::any::type_name::<T>())
189 .and_then(|v| T::deserialize(v).ok())
190 .ok_or(serde_json::Error::custom("Failed to extract metadata"))
191 }
192 fn inject(&mut self, value: T) -> Result<(), Self::Error> {
193 self.meta.insert(
194 std::any::type_name::<T>().to_owned(),
195 serde_json::to_value(&value).unwrap(),
196 );
197 Ok(())
198 }
199}