forge_core/testing/context/
job.rs1use std::collections::HashMap;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, RwLock};
6
7use sqlx::PgPool;
8use uuid::Uuid;
9
10use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
11use super::build_test_auth;
12use crate::Result;
13use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
14use crate::function::AuthContext;
15
16#[derive(Debug, Clone)]
18pub struct TestProgressUpdate {
19 pub percent: u8,
21 pub message: String,
23}
24
25pub struct TestJobContext {
48 pub job_id: Uuid,
50 pub job_type: String,
52 pub attempt: u32,
54 pub max_attempts: u32,
56 pub auth: AuthContext,
58 pool: Option<PgPool>,
60 http: Arc<MockHttp>,
62 progress_updates: Arc<RwLock<Vec<TestProgressUpdate>>>,
64 env_provider: Arc<MockEnvProvider>,
66 saved_data: Arc<RwLock<serde_json::Value>>,
68 cancel_requested: Arc<AtomicBool>,
70}
71
72impl TestJobContext {
73 pub fn builder(job_type: impl Into<String>) -> TestJobContextBuilder {
75 TestJobContextBuilder::new(job_type)
76 }
77
78 pub fn db(&self) -> Option<&PgPool> {
80 self.pool.as_ref()
81 }
82
83 pub fn http(&self) -> &MockHttp {
85 &self.http
86 }
87
88 pub fn progress(&self, percent: u8, message: impl Into<String>) -> Result<()> {
90 let update = TestProgressUpdate {
91 percent: percent.min(100),
92 message: message.into(),
93 };
94 self.progress_updates.write().unwrap().push(update);
95 Ok(())
96 }
97
98 pub fn progress_updates(&self) -> Vec<TestProgressUpdate> {
100 self.progress_updates.read().unwrap().clone()
101 }
102
103 pub fn saved(&self) -> serde_json::Value {
105 self.saved_data.read().unwrap().clone()
106 }
107
108 pub fn set_saved(&self, data: serde_json::Value) -> Result<()> {
110 let mut guard = self.saved_data.write().unwrap();
111 *guard = data;
112 Ok(())
113 }
114
115 pub fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
117 let mut guard = self.saved_data.write().unwrap();
118 if let Some(map) = guard.as_object_mut() {
119 map.insert(key.to_string(), value);
120 } else {
121 let mut map = serde_json::Map::new();
122 map.insert(key.to_string(), value);
123 *guard = serde_json::Value::Object(map);
124 }
125 Ok(())
126 }
127
128 pub fn is_retry(&self) -> bool {
130 self.attempt > 1
131 }
132
133 pub fn is_last_attempt(&self) -> bool {
135 self.attempt >= self.max_attempts
136 }
137
138 pub async fn heartbeat(&self) -> Result<()> {
140 Ok(())
141 }
142
143 pub fn is_cancel_requested(&self) -> Result<bool> {
145 Ok(self.cancel_requested.load(Ordering::SeqCst))
146 }
147
148 pub fn check_cancelled(&self) -> Result<()> {
152 if self.cancel_requested.load(Ordering::SeqCst) {
153 Err(crate::ForgeError::JobCancelled(
154 "Job cancellation requested".to_string(),
155 ))
156 } else {
157 Ok(())
158 }
159 }
160
161 pub fn request_cancellation(&self) {
166 self.cancel_requested.store(true, Ordering::SeqCst);
167 }
168
169 pub fn env_mock(&self) -> &MockEnvProvider {
171 &self.env_provider
172 }
173}
174
175impl EnvAccess for TestJobContext {
176 fn env_provider(&self) -> &dyn EnvProvider {
177 self.env_provider.as_ref()
178 }
179}
180
181pub struct TestJobContextBuilder {
183 job_id: Option<Uuid>,
184 job_type: String,
185 attempt: u32,
186 max_attempts: u32,
187 user_id: Option<Uuid>,
188 roles: Vec<String>,
189 claims: HashMap<String, serde_json::Value>,
190 pool: Option<PgPool>,
191 http: MockHttp,
192 env_vars: HashMap<String, String>,
193 cancel_requested: bool,
194}
195
196impl TestJobContextBuilder {
197 pub fn new(job_type: impl Into<String>) -> Self {
199 Self {
200 job_id: None,
201 job_type: job_type.into(),
202 attempt: 1,
203 max_attempts: 1,
204 user_id: None,
205 roles: Vec::new(),
206 claims: HashMap::new(),
207 pool: None,
208 http: MockHttp::new(),
209 env_vars: HashMap::new(),
210 cancel_requested: false,
211 }
212 }
213
214 pub fn with_job_id(mut self, id: Uuid) -> Self {
216 self.job_id = Some(id);
217 self
218 }
219
220 pub fn as_retry(mut self, attempt: u32) -> Self {
222 self.attempt = attempt.max(1);
223 self
224 }
225
226 pub fn with_max_attempts(mut self, max: u32) -> Self {
228 self.max_attempts = max.max(1);
229 self
230 }
231
232 pub fn as_last_attempt(mut self) -> Self {
234 self.attempt = 3;
235 self.max_attempts = 3;
236 self
237 }
238
239 pub fn as_user(mut self, id: Uuid) -> Self {
241 self.user_id = Some(id);
242 self
243 }
244
245 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
247 self.claims
248 .insert("sub".to_string(), serde_json::json!(subject.into()));
249 self
250 }
251
252 pub fn with_role(mut self, role: impl Into<String>) -> Self {
254 self.roles.push(role.into());
255 self
256 }
257
258 pub fn with_pool(mut self, pool: PgPool) -> Self {
260 self.pool = Some(pool);
261 self
262 }
263
264 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
266 where
267 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
268 {
269 self.http.add_mock_sync(pattern, handler);
270 self
271 }
272
273 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
275 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
276 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
277 }
278
279 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
281 self.env_vars.insert(key.into(), value.into());
282 self
283 }
284
285 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
287 self.env_vars.extend(vars);
288 self
289 }
290
291 pub fn with_cancellation_requested(mut self) -> Self {
295 self.cancel_requested = true;
296 self
297 }
298
299 pub fn build(self) -> TestJobContext {
301 TestJobContext {
302 job_id: self.job_id.unwrap_or_else(Uuid::new_v4),
303 job_type: self.job_type,
304 attempt: self.attempt,
305 max_attempts: self.max_attempts,
306 auth: build_test_auth(self.user_id, self.roles, self.claims),
307 pool: self.pool,
308 http: Arc::new(self.http),
309 progress_updates: Arc::new(RwLock::new(Vec::new())),
310 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
311 saved_data: Arc::new(RwLock::new(crate::job::empty_saved_data())),
312 cancel_requested: Arc::new(AtomicBool::new(self.cancel_requested)),
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn test_job_context_creation() {
323 let ctx = TestJobContext::builder("export_users").build();
324
325 assert_eq!(ctx.job_type, "export_users");
326 assert_eq!(ctx.attempt, 1);
327 assert!(!ctx.is_retry());
328 assert!(ctx.is_last_attempt()); }
330
331 #[test]
332 fn test_retry_detection() {
333 let ctx = TestJobContext::builder("test")
334 .as_retry(3)
335 .with_max_attempts(5)
336 .build();
337
338 assert!(ctx.is_retry());
339 assert!(!ctx.is_last_attempt());
340 }
341
342 #[test]
343 fn test_last_attempt() {
344 let ctx = TestJobContext::builder("test").as_last_attempt().build();
345
346 assert!(ctx.is_retry());
347 assert!(ctx.is_last_attempt());
348 }
349
350 #[test]
351 fn test_progress_tracking() {
352 let ctx = TestJobContext::builder("test").build();
353
354 ctx.progress(25, "Step 1 complete").unwrap();
355 ctx.progress(50, "Step 2 complete").unwrap();
356 ctx.progress(100, "Done").unwrap();
357
358 let updates = ctx.progress_updates();
359 assert_eq!(updates.len(), 3);
360 assert_eq!(updates[0].percent, 25);
361 assert_eq!(updates[2].percent, 100);
362 }
363
364 #[test]
365 fn test_save_and_saved() {
366 let ctx = TestJobContext::builder("test").build();
367 ctx.save("charge_id", serde_json::json!("ch_123")).unwrap();
368 ctx.save("amount", serde_json::json!(100)).unwrap();
369
370 let saved = ctx.saved();
371 assert_eq!(saved["charge_id"], "ch_123");
372 assert_eq!(saved["amount"], 100);
373 }
374
375 #[test]
376 fn test_cancellation_not_requested() {
377 let ctx = TestJobContext::builder("test").build();
378
379 assert!(!ctx.is_cancel_requested().unwrap());
380 assert!(ctx.check_cancelled().is_ok());
381 }
382
383 #[test]
384 fn test_cancellation_requested_at_build() {
385 let ctx = TestJobContext::builder("test")
386 .with_cancellation_requested()
387 .build();
388
389 assert!(ctx.is_cancel_requested().unwrap());
390 assert!(ctx.check_cancelled().is_err());
391 }
392
393 #[test]
394 fn test_request_cancellation_mid_test() {
395 let ctx = TestJobContext::builder("test").build();
396
397 assert!(!ctx.is_cancel_requested().unwrap());
398 ctx.request_cancellation();
399 assert!(ctx.is_cancel_requested().unwrap());
400 assert!(ctx.check_cancelled().is_err());
401 }
402}