1use crate::Error;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[async_trait]
44pub trait Job: Send + Sync + 'static {
45 async fn handle(&self) -> Result<(), Error>;
47
48 fn name(&self) -> &'static str {
50 std::any::type_name::<Self>()
51 }
52
53 fn max_retries(&self) -> u32 {
55 3
56 }
57
58 fn retry_delay(&self, _attempt: u32) -> std::time::Duration {
60 std::time::Duration::from_secs(5)
61 }
62
63 async fn failed(&self, error: &Error) {
65 tracing::error!(job = self.name(), error = %error, "Job failed permanently");
66 }
67
68 fn timeout(&self) -> std::time::Duration {
70 std::time::Duration::from_secs(60)
71 }
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct JobPayload {
77 pub id: Uuid,
79 pub job_type: String,
81 pub data: String,
83 pub queue: String,
85 pub attempts: u32,
87 pub max_retries: u32,
89 pub created_at: DateTime<Utc>,
91 pub available_at: DateTime<Utc>,
93 pub reserved_at: Option<DateTime<Utc>>,
95 #[serde(default)]
99 pub tenant_id: Option<i64>,
100}
101
102impl JobPayload {
103 pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
105 let data =
106 serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
107
108 Ok(Self {
109 id: Uuid::new_v4(),
110 job_type: job.name().to_string(),
111 data,
112 queue: queue.to_string(),
113 attempts: 0,
114 max_retries: job.max_retries(),
115 created_at: Utc::now(),
116 available_at: Utc::now(),
117 reserved_at: None,
118 tenant_id: None,
119 })
120 }
121
122 pub fn with_delay<J: Job + Serialize>(
124 job: &J,
125 queue: &str,
126 delay: std::time::Duration,
127 ) -> Result<Self, Error> {
128 let mut payload = Self::new(job, queue)?;
129 payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
130 Ok(payload)
131 }
132
133 pub fn with_tenant_id(mut self, id: Option<i64>) -> Self {
135 self.tenant_id = id;
136 self
137 }
138
139 pub fn is_available(&self) -> bool {
141 Utc::now() >= self.available_at
142 }
143
144 pub fn has_exceeded_retries(&self) -> bool {
146 self.attempts >= self.max_retries
147 }
148
149 pub fn increment_attempts(&mut self) {
151 self.attempts += 1;
152 }
153
154 pub fn reserve(&mut self) {
156 self.reserved_at = Some(Utc::now());
157 }
158
159 pub fn to_json(&self) -> Result<String, Error> {
161 serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
162 }
163
164 pub fn from_json(json: &str) -> Result<Self, Error> {
166 serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[derive(Debug, Clone, Serialize, Deserialize)]
175 struct TestJob {
176 value: i32,
177 }
178
179 #[async_trait]
180 impl Job for TestJob {
181 async fn handle(&self) -> Result<(), Error> {
182 Ok(())
183 }
184 }
185
186 #[test]
187 fn test_job_payload_creation() {
188 let job = TestJob { value: 42 };
189 let payload = JobPayload::new(&job, "default").unwrap();
190
191 assert_eq!(payload.queue, "default");
192 assert_eq!(payload.attempts, 0);
193 assert!(payload.is_available());
194 }
195
196 #[test]
197 fn test_job_payload_with_delay() {
198 let job = TestJob { value: 42 };
199 let payload =
200 JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
201
202 assert!(!payload.is_available());
203 }
204
205 #[test]
206 fn test_job_payload_serialization() {
207 let job = TestJob { value: 42 };
208 let payload = JobPayload::new(&job, "default").unwrap();
209
210 let json = payload.to_json().unwrap();
211 let restored = JobPayload::from_json(&json).unwrap();
212
213 assert_eq!(payload.id, restored.id);
214 assert_eq!(payload.queue, restored.queue);
215 }
216
217 #[test]
218 fn test_tenant_id_none_by_default() {
219 let job = TestJob { value: 42 };
220 let payload = JobPayload::new(&job, "default").unwrap();
221 assert_eq!(payload.tenant_id, None);
222 }
223
224 #[test]
225 fn test_tenant_id_none_serializes_as_null() {
226 let job = TestJob { value: 42 };
227 let payload = JobPayload::new(&job, "default").unwrap();
228 let json = payload.to_json().unwrap();
229 let val: serde_json::Value = serde_json::from_str(&json).unwrap();
230 assert_eq!(val["tenant_id"], serde_json::Value::Null);
231 }
232
233 #[test]
234 fn test_tenant_id_some_round_trips() {
235 let job = TestJob { value: 42 };
236 let payload = JobPayload::new(&job, "default")
237 .unwrap()
238 .with_tenant_id(Some(42));
239 let json = payload.to_json().unwrap();
240 let restored = JobPayload::from_json(&json).unwrap();
241 assert_eq!(restored.tenant_id, Some(42));
242 }
243
244 #[test]
245 fn test_old_payload_without_tenant_id_deserializes_to_none() {
246 let old_json = r#"{"id":"550e8400-e29b-41d4-a716-446655440000","job_type":"test","data":"{}","queue":"default","attempts":0,"max_retries":3,"created_at":"2024-01-01T00:00:00Z","available_at":"2024-01-01T00:00:00Z","reserved_at":null}"#;
248 let payload = JobPayload::from_json(old_json).unwrap();
249 assert_eq!(payload.tenant_id, None);
250 }
251
252 #[test]
253 fn test_with_tenant_id_builder() {
254 let job = TestJob { value: 42 };
255 let payload = JobPayload::new(&job, "default")
256 .unwrap()
257 .with_tenant_id(Some(99));
258 assert_eq!(payload.tenant_id, Some(99));
259
260 let payload_none = JobPayload::new(&job, "default")
261 .unwrap()
262 .with_tenant_id(None);
263 assert_eq!(payload_none.tenant_id, None);
264 }
265}