1use std::sync::{Arc, mpsc};
2
3use uuid::Uuid;
4
5use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
6use crate::function::AuthContext;
7
8pub fn empty_saved_data() -> serde_json::Value {
10 serde_json::Value::Object(serde_json::Map::new())
11}
12
13pub struct JobContext {
15 pub job_id: Uuid,
17 pub job_type: String,
19 pub attempt: u32,
21 pub max_attempts: u32,
23 pub auth: AuthContext,
25 saved_data: Arc<tokio::sync::RwLock<serde_json::Value>>,
27 db_pool: sqlx::PgPool,
29 http_client: reqwest::Client,
31 progress_tx: Option<mpsc::Sender<ProgressUpdate>>,
33 env_provider: Arc<dyn EnvProvider>,
35}
36
37#[derive(Debug, Clone)]
39pub struct ProgressUpdate {
40 pub job_id: Uuid,
42 pub percentage: u8,
44 pub message: String,
46}
47
48impl JobContext {
49 pub fn new(
51 job_id: Uuid,
52 job_type: String,
53 attempt: u32,
54 max_attempts: u32,
55 db_pool: sqlx::PgPool,
56 http_client: reqwest::Client,
57 ) -> Self {
58 Self {
59 job_id,
60 job_type,
61 attempt,
62 max_attempts,
63 auth: AuthContext::unauthenticated(),
64 saved_data: Arc::new(tokio::sync::RwLock::new(empty_saved_data())),
65 db_pool,
66 http_client,
67 progress_tx: None,
68 env_provider: Arc::new(RealEnvProvider::new()),
69 }
70 }
71
72 pub fn with_saved(mut self, data: serde_json::Value) -> Self {
74 self.saved_data = Arc::new(tokio::sync::RwLock::new(data));
75 self
76 }
77
78 pub fn with_auth(mut self, auth: AuthContext) -> Self {
80 self.auth = auth;
81 self
82 }
83
84 pub fn with_progress(mut self, tx: mpsc::Sender<ProgressUpdate>) -> Self {
86 self.progress_tx = Some(tx);
87 self
88 }
89
90 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
92 self.env_provider = provider;
93 self
94 }
95
96 pub fn db(&self) -> &sqlx::PgPool {
98 &self.db_pool
99 }
100
101 pub fn http(&self) -> &reqwest::Client {
103 &self.http_client
104 }
105
106 pub fn progress(&self, percentage: u8, message: impl Into<String>) -> crate::Result<()> {
108 let update = ProgressUpdate {
109 job_id: self.job_id,
110 percentage: percentage.min(100),
111 message: message.into(),
112 };
113
114 if let Some(ref tx) = self.progress_tx {
115 tx.send(update)
116 .map_err(|e| crate::ForgeError::Job(format!("Failed to send progress: {}", e)))?;
117 }
118
119 Ok(())
120 }
121
122 pub async fn saved(&self) -> serde_json::Value {
127 self.saved_data.read().await.clone()
128 }
129
130 pub async fn set_saved(&self, data: serde_json::Value) -> crate::Result<()> {
135 let mut guard = self.saved_data.write().await;
136 *guard = data;
137 let persisted = Self::clone_and_drop(guard);
138 if self.job_id.is_nil() {
139 return Ok(());
140 }
141 self.persist_saved_data(persisted).await
142 }
143
144 pub async fn save(&self, key: &str, value: serde_json::Value) -> crate::Result<()> {
157 let mut guard = self.saved_data.write().await;
158 Self::apply_save(&mut guard, key, value);
159 let persisted = Self::clone_and_drop(guard);
160 if self.job_id.is_nil() {
161 return Ok(());
162 }
163 self.persist_saved_data(persisted).await
164 }
165
166 pub async fn is_cancel_requested(&self) -> crate::Result<bool> {
168 let row: Option<(String,)> = sqlx::query_as(
169 r#"
170 SELECT status
171 FROM forge_jobs
172 WHERE id = $1
173 "#,
174 )
175 .bind(self.job_id)
176 .fetch_optional(&self.db_pool)
177 .await
178 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
179
180 Ok(matches!(
181 row.as_ref().map(|(status,)| status.as_str()),
182 Some("cancel_requested") | Some("cancelled")
183 ))
184 }
185
186 pub async fn check_cancelled(&self) -> crate::Result<()> {
188 if self.is_cancel_requested().await? {
189 Err(crate::ForgeError::JobCancelled(
190 "Job cancellation requested".to_string(),
191 ))
192 } else {
193 Ok(())
194 }
195 }
196
197 async fn persist_saved_data(&self, data: serde_json::Value) -> crate::Result<()> {
198 sqlx::query(
199 r#"
200 UPDATE forge_jobs
201 SET job_context = $2
202 WHERE id = $1
203 "#,
204 )
205 .bind(self.job_id)
206 .bind(data)
207 .execute(&self.db_pool)
208 .await
209 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
210
211 Ok(())
212 }
213
214 fn apply_save(data: &mut serde_json::Value, key: &str, value: serde_json::Value) {
215 if let Some(map) = data.as_object_mut() {
216 map.insert(key.to_string(), value);
217 } else {
218 let mut map = serde_json::Map::new();
219 map.insert(key.to_string(), value);
220 *data = serde_json::Value::Object(map);
221 }
222 }
223
224 fn clone_and_drop(
225 guard: tokio::sync::RwLockWriteGuard<'_, serde_json::Value>,
226 ) -> serde_json::Value {
227 let cloned = guard.clone();
228 drop(guard);
229 cloned
230 }
231
232 pub async fn heartbeat(&self) -> crate::Result<()> {
234 sqlx::query(
235 r#"
236 UPDATE forge_jobs
237 SET last_heartbeat = NOW()
238 WHERE id = $1
239 "#,
240 )
241 .bind(self.job_id)
242 .execute(&self.db_pool)
243 .await
244 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
245
246 Ok(())
247 }
248
249 pub fn is_retry(&self) -> bool {
251 self.attempt > 1
252 }
253
254 pub fn is_last_attempt(&self) -> bool {
256 self.attempt >= self.max_attempts
257 }
258}
259
260impl EnvAccess for JobContext {
261 fn env_provider(&self) -> &dyn EnvProvider {
262 self.env_provider.as_ref()
263 }
264}
265
266#[cfg(test)]
267#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
268mod tests {
269 use super::*;
270
271 #[tokio::test]
272 async fn test_job_context_creation() {
273 let pool = sqlx::postgres::PgPoolOptions::new()
274 .max_connections(1)
275 .connect_lazy("postgres://localhost/nonexistent")
276 .expect("Failed to create mock pool");
277
278 let job_id = Uuid::new_v4();
279 let ctx = JobContext::new(
280 job_id,
281 "test_job".to_string(),
282 1,
283 3,
284 pool,
285 reqwest::Client::new(),
286 );
287
288 assert_eq!(ctx.job_id, job_id);
289 assert_eq!(ctx.job_type, "test_job");
290 assert_eq!(ctx.attempt, 1);
291 assert_eq!(ctx.max_attempts, 3);
292 assert!(!ctx.is_retry());
293 assert!(!ctx.is_last_attempt());
294 }
295
296 #[tokio::test]
297 async fn test_is_retry() {
298 let pool = sqlx::postgres::PgPoolOptions::new()
299 .max_connections(1)
300 .connect_lazy("postgres://localhost/nonexistent")
301 .expect("Failed to create mock pool");
302
303 let ctx = JobContext::new(
304 Uuid::new_v4(),
305 "test".to_string(),
306 2,
307 3,
308 pool,
309 reqwest::Client::new(),
310 );
311
312 assert!(ctx.is_retry());
313 }
314
315 #[tokio::test]
316 async fn test_is_last_attempt() {
317 let pool = sqlx::postgres::PgPoolOptions::new()
318 .max_connections(1)
319 .connect_lazy("postgres://localhost/nonexistent")
320 .expect("Failed to create mock pool");
321
322 let ctx = JobContext::new(
323 Uuid::new_v4(),
324 "test".to_string(),
325 3,
326 3,
327 pool,
328 reqwest::Client::new(),
329 );
330
331 assert!(ctx.is_last_attempt());
332 }
333
334 #[test]
335 fn test_progress_update() {
336 let update = ProgressUpdate {
337 job_id: Uuid::new_v4(),
338 percentage: 50,
339 message: "Halfway there".to_string(),
340 };
341
342 assert_eq!(update.percentage, 50);
343 assert_eq!(update.message, "Halfway there");
344 }
345
346 #[tokio::test]
347 async fn test_saved_data_in_memory() {
348 let pool = sqlx::postgres::PgPoolOptions::new()
349 .max_connections(1)
350 .connect_lazy("postgres://localhost/nonexistent")
351 .expect("Failed to create mock pool");
352
353 let ctx = JobContext::new(
354 Uuid::nil(),
355 "test_job".to_string(),
356 1,
357 3,
358 pool,
359 reqwest::Client::new(),
360 )
361 .with_saved(serde_json::json!({"foo": "bar"}));
362
363 let saved = ctx.saved().await;
364 assert_eq!(saved["foo"], "bar");
365 }
366
367 #[tokio::test]
368 async fn test_save_key_value() {
369 let pool = sqlx::postgres::PgPoolOptions::new()
370 .max_connections(1)
371 .connect_lazy("postgres://localhost/nonexistent")
372 .expect("Failed to create mock pool");
373
374 let ctx = JobContext::new(
375 Uuid::nil(),
376 "test_job".to_string(),
377 1,
378 3,
379 pool,
380 reqwest::Client::new(),
381 );
382
383 ctx.save("charge_id", serde_json::json!("ch_123"))
384 .await
385 .unwrap();
386 ctx.save("amount", serde_json::json!(100)).await.unwrap();
387
388 let saved = ctx.saved().await;
389 assert_eq!(saved["charge_id"], "ch_123");
390 assert_eq!(saved["amount"], 100);
391 }
392}