1use std::convert::Infallible;
2
3use apalis_core::{
4 task::{Task, metadata::MetadataExt},
5 task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11 Deserialize, Serialize,
12 de::{DeserializeOwned, Error},
13};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SqlContext {
18 max_attempts: i32,
19 last_result: Option<serde_json::Value>,
20 lock_at: Option<i64>,
21 lock_by: Option<String>,
22 done_at: Option<i64>,
23 priority: i32,
24 queue: Option<String>,
25 meta: JsonMapMetadata,
26}
27
28impl Default for SqlContext {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl SqlContext {
35 #[must_use]
37 pub fn new() -> Self {
38 Self {
39 lock_at: None,
40 done_at: None,
41 max_attempts: 5,
42 last_result: None,
43 lock_by: None,
44 priority: 0,
45 queue: None,
46 meta: Default::default(),
47 }
48 }
49
50 #[must_use]
52 pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
53 self.max_attempts = max_attempts;
54 self
55 }
56
57 #[must_use]
59 pub fn max_attempts(&self) -> i32 {
60 self.max_attempts
61 }
62
63 #[must_use]
65 pub fn done_at(&self) -> &Option<i64> {
66 &self.done_at
67 }
68
69 #[must_use]
71 pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
72 self.done_at = done_at;
73 self
74 }
75
76 #[must_use]
78 pub fn lock_at(&self) -> &Option<i64> {
79 &self.lock_at
80 }
81
82 #[must_use]
84 pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
85 self.lock_at = lock_at;
86 self
87 }
88
89 #[must_use]
91 pub fn lock_by(&self) -> &Option<String> {
92 &self.lock_by
93 }
94
95 #[must_use]
97 pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
98 self.lock_by = lock_by;
99 self
100 }
101
102 #[must_use]
104 pub fn last_result(&self) -> &Option<serde_json::Value> {
105 &self.last_result
106 }
107
108 #[must_use]
110 pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
111 self.last_result = result;
112 self
113 }
114
115 #[must_use]
117 pub fn with_priority(mut self, priority: i32) -> Self {
118 self.priority = priority;
119 self
120 }
121
122 #[must_use]
124 pub fn priority(&self) -> i32 {
125 self.priority
126 }
127
128 #[must_use]
130 pub fn queue(&self) -> &Option<String> {
131 &self.queue
132 }
133
134 #[must_use]
136 pub fn with_queue(mut self, queue: String) -> Self {
137 self.queue = Some(queue);
138 self
139 }
140
141 #[must_use]
143 pub fn meta(&self) -> &JsonMapMetadata {
144 &self.meta
145 }
146
147 #[must_use]
149 pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
150 self.meta = meta;
151 self
152 }
153}
154
155impl<Args: Sync, IdType: Sync> FromRequest<Task<Args, Self, IdType>> for SqlContext {
156 type Error = Infallible;
157 async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
158 Ok(req.parts.ctx.clone())
159 }
160}
161
162impl<T: DeserializeOwned + Serialize> MetadataExt<T> for SqlContext {
163 type Error = serde_json::Error;
164 fn extract(&self) -> Result<T, Self::Error> {
165 self.meta
166 .get(std::any::type_name::<T>())
167 .and_then(|v| T::deserialize(v).ok())
168 .ok_or(serde_json::Error::custom("Failed to extract metadata"))
169 }
170 fn inject(&mut self, value: T) -> Result<(), Self::Error> {
171 self.meta.insert(
172 std::any::type_name::<T>().to_owned(),
173 serde_json::to_value(&value).unwrap(),
174 );
175 Ok(())
176 }
177}