1use crate::Error;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[async_trait]
44pub trait Job: Send + Sync + 'static {
45 async fn handle(&self) -> Result<(), Error>;
47
48 fn name(&self) -> &'static str {
50 std::any::type_name::<Self>()
51 }
52
53 fn max_retries(&self) -> u32 {
55 3
56 }
57
58 fn retry_delay(&self, attempt: u32) -> std::time::Duration {
63 use rand::Rng;
65 let base_secs: u64 = 5;
66 let cap_secs: u64 = 15 * 60;
67 let max_delay = cap_secs.min(base_secs.saturating_mul(2u64.saturating_pow(attempt)));
68 let jitter = rand::thread_rng().gen_range(0..=max_delay);
69 std::time::Duration::from_secs(jitter)
70 }
71
72 async fn failed(&self, error: &Error) {
74 tracing::error!(job = self.name(), error = %error, "Job failed permanently");
75 }
76
77 fn timeout(&self) -> std::time::Duration {
79 std::time::Duration::from_secs(60)
80 }
81
82 fn idempotency_key(&self) -> Option<String> {
87 None
88 }
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct JobPayload {
94 pub id: Uuid,
96 pub job_type: String,
98 pub data: String,
100 pub queue: String,
102 pub attempts: u32,
104 pub max_retries: u32,
106 pub created_at: DateTime<Utc>,
108 pub available_at: DateTime<Utc>,
110 pub reserved_at: Option<DateTime<Utc>>,
112 #[serde(default)]
116 pub tenant_id: Option<i64>,
117}
118
119impl JobPayload {
120 pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
122 let data =
123 serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
124
125 Ok(Self {
126 id: Uuid::new_v4(),
127 job_type: job.name().to_string(),
128 data,
129 queue: queue.to_string(),
130 attempts: 0,
131 max_retries: job.max_retries(),
132 created_at: Utc::now(),
133 available_at: Utc::now(),
134 reserved_at: None,
135 tenant_id: None,
136 })
137 }
138
139 pub fn with_delay<J: Job + Serialize>(
141 job: &J,
142 queue: &str,
143 delay: std::time::Duration,
144 ) -> Result<Self, Error> {
145 let mut payload = Self::new(job, queue)?;
146 payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
147 Ok(payload)
148 }
149
150 pub fn with_tenant_id(mut self, id: Option<i64>) -> Self {
152 self.tenant_id = id;
153 self
154 }
155
156 pub fn is_available(&self) -> bool {
158 Utc::now() >= self.available_at
159 }
160
161 pub fn has_exceeded_retries(&self) -> bool {
163 self.attempts >= self.max_retries
164 }
165
166 pub fn increment_attempts(&mut self) {
168 self.attempts += 1;
169 }
170
171 pub fn reserve(&mut self) {
173 self.reserved_at = Some(Utc::now());
174 }
175
176 pub fn to_json(&self) -> Result<String, Error> {
178 serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
179 }
180
181 pub fn from_json(json: &str) -> Result<Self, Error> {
183 serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[derive(Debug, Clone, Serialize, Deserialize)]
192 struct TestJob {
193 value: i32,
194 }
195
196 #[async_trait]
197 impl Job for TestJob {
198 async fn handle(&self) -> Result<(), Error> {
199 Ok(())
200 }
201 }
202
203 #[test]
204 fn test_job_payload_creation() {
205 let job = TestJob { value: 42 };
206 let payload = JobPayload::new(&job, "default").unwrap();
207
208 assert_eq!(payload.queue, "default");
209 assert_eq!(payload.attempts, 0);
210 assert!(payload.is_available());
211 }
212
213 #[test]
214 fn test_job_payload_with_delay() {
215 let job = TestJob { value: 42 };
216 let payload =
217 JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
218
219 assert!(!payload.is_available());
220 }
221
222 #[test]
223 fn test_job_payload_serialization() {
224 let job = TestJob { value: 42 };
225 let payload = JobPayload::new(&job, "default").unwrap();
226
227 let json = payload.to_json().unwrap();
228 let restored = JobPayload::from_json(&json).unwrap();
229
230 assert_eq!(payload.id, restored.id);
231 assert_eq!(payload.queue, restored.queue);
232 }
233
234 #[test]
235 fn test_tenant_id_none_by_default() {
236 let job = TestJob { value: 42 };
237 let payload = JobPayload::new(&job, "default").unwrap();
238 assert_eq!(payload.tenant_id, None);
239 }
240
241 #[test]
242 fn test_tenant_id_none_serializes_as_null() {
243 let job = TestJob { value: 42 };
244 let payload = JobPayload::new(&job, "default").unwrap();
245 let json = payload.to_json().unwrap();
246 let val: serde_json::Value = serde_json::from_str(&json).unwrap();
247 assert_eq!(val["tenant_id"], serde_json::Value::Null);
248 }
249
250 #[test]
251 fn test_tenant_id_some_round_trips() {
252 let job = TestJob { value: 42 };
253 let payload = JobPayload::new(&job, "default")
254 .unwrap()
255 .with_tenant_id(Some(42));
256 let json = payload.to_json().unwrap();
257 let restored = JobPayload::from_json(&json).unwrap();
258 assert_eq!(restored.tenant_id, Some(42));
259 }
260
261 #[test]
262 fn test_old_payload_without_tenant_id_deserializes_to_none() {
263 let old_json = r#"{"id":"550e8400-e29b-41d4-a716-446655440000","job_type":"test","data":"{}","queue":"default","attempts":0,"max_retries":3,"created_at":"2024-01-01T00:00:00Z","available_at":"2024-01-01T00:00:00Z","reserved_at":null}"#;
265 let payload = JobPayload::from_json(old_json).unwrap();
266 assert_eq!(payload.tenant_id, None);
267 }
268
269 #[test]
270 fn test_with_tenant_id_builder() {
271 let job = TestJob { value: 42 };
272 let payload = JobPayload::new(&job, "default")
273 .unwrap()
274 .with_tenant_id(Some(99));
275 assert_eq!(payload.tenant_id, Some(99));
276
277 let payload_none = JobPayload::new(&job, "default")
278 .unwrap()
279 .with_tenant_id(None);
280 assert_eq!(payload_none.tenant_id, None);
281 }
282
283 #[test]
284 fn backoff_delay_range() {
285 let job = TestJob { value: 0 };
286 for _ in 0..100 {
287 assert!(job.retry_delay(0).as_secs() <= 5);
288 assert!(job.retry_delay(3).as_secs() <= 40);
289 assert!(job.retry_delay(30).as_secs() <= 900);
290 }
291 }
292
293 #[test]
294 fn idempotency_key_defaults_to_none() {
295 let job = TestJob { value: 0 };
296 assert_eq!(job.idempotency_key(), None);
297 }
298}