1use std::convert::Infallible;
2
3use apalis_core::{
4 task::{Task, metadata::MetadataExt},
5 task_fn::FromRequest,
6};
7
8type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
9
10use serde::{
11 Deserialize, Serialize,
12 de::{DeserializeOwned, Error},
13};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SqlContext {
17 max_attempts: i32,
18 last_result: Option<serde_json::Value>,
19 lock_at: Option<i64>,
20 lock_by: Option<String>,
21 done_at: Option<i64>,
22 priority: i32,
23 queue: Option<String>,
24 meta: JsonMapMetadata,
25}
26
27impl Default for SqlContext {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl SqlContext {
34 pub fn new() -> Self {
36 SqlContext {
37 lock_at: None,
38 done_at: None,
39 max_attempts: 5,
40 last_result: None,
41 lock_by: None,
42 priority: 0,
43 queue: None,
44 meta: Default::default(),
45 }
46 }
47
48 pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
50 self.max_attempts = max_attempts;
51 self
52 }
53
54 pub fn max_attempts(&self) -> i32 {
56 self.max_attempts
57 }
58
59 pub fn done_at(&self) -> &Option<i64> {
61 &self.done_at
62 }
63
64 pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
66 self.done_at = done_at;
67 self
68 }
69
70 pub fn lock_at(&self) -> &Option<i64> {
72 &self.lock_at
73 }
74
75 pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
77 self.lock_at = lock_at;
78 self
79 }
80
81 pub fn lock_by(&self) -> &Option<String> {
83 &self.lock_by
84 }
85
86 pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
88 self.lock_by = lock_by;
89 self
90 }
91
92 pub fn last_result(&self) -> &Option<serde_json::Value> {
94 &self.last_result
95 }
96
97 pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
99 self.last_result = result;
100 self
101 }
102
103 pub fn with_priority(mut self, priority: i32) -> Self {
105 self.priority = priority;
106 self
107 }
108
109 pub fn priority(&self) -> i32 {
111 self.priority
112 }
113
114 pub fn queue(&self) -> &Option<String> {
115 &self.queue
116 }
117
118 pub fn with_queue(mut self, queue: String) -> Self {
119 self.queue = Some(queue);
120 self
121 }
122
123 pub fn meta(&self) -> &JsonMapMetadata {
124 &self.meta
125 }
126
127 pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
128 self.meta = meta;
129 self
130 }
131}
132
133impl<Args: Sync, IdType: Sync> FromRequest<Task<Args, Self, IdType>> for SqlContext {
134 type Error = Infallible;
135 async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
136 Ok(req.parts.ctx.clone())
137 }
138}
139
140impl<T: DeserializeOwned + Serialize> MetadataExt<T> for SqlContext {
141 type Error = serde_json::Error;
142 fn extract(&self) -> Result<T, Self::Error> {
143 self.meta
144 .get(std::any::type_name::<T>())
145 .and_then(|v| T::deserialize(v).ok())
146 .ok_or(serde_json::Error::custom("Failed to extract metadata"))
147 }
148 fn inject(&mut self, value: T) -> Result<(), Self::Error> {
149 self.meta.insert(
150 std::any::type_name::<T>().to_string(),
151 serde_json::to_value(&value).unwrap(),
152 );
153 Ok(())
154 }
155}