forge_core/testing/context/
job.rs1#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8
9use sqlx::PgPool;
10use uuid::Uuid;
11
12use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
13use super::build_test_auth;
14use crate::Result;
15use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
16use crate::function::AuthContext;
17
18#[derive(Debug, Clone)]
20pub struct TestProgressUpdate {
21 pub percent: u8,
23 pub message: String,
25}
26
27pub struct TestJobContext {
50 pub job_id: Uuid,
52 pub job_type: String,
54 pub attempt: u32,
56 pub max_attempts: u32,
58 pub auth: AuthContext,
60 pool: Option<PgPool>,
62 http: Arc<MockHttp>,
64 progress_updates: Arc<RwLock<Vec<TestProgressUpdate>>>,
66 env_provider: Arc<MockEnvProvider>,
68 saved_data: Arc<RwLock<serde_json::Value>>,
70 cancel_requested: Arc<AtomicBool>,
72}
73
74impl TestJobContext {
75 pub fn builder(job_type: impl Into<String>) -> TestJobContextBuilder {
77 TestJobContextBuilder::new(job_type)
78 }
79
80 pub fn db(&self) -> Option<&PgPool> {
82 self.pool.as_ref()
83 }
84
85 pub fn http(&self) -> &MockHttp {
87 &self.http
88 }
89
90 pub fn progress(&self, percent: u8, message: impl Into<String>) -> Result<()> {
92 let update = TestProgressUpdate {
93 percent: percent.min(100),
94 message: message.into(),
95 };
96 self.progress_updates.write().unwrap().push(update);
97 Ok(())
98 }
99
100 pub fn progress_updates(&self) -> Vec<TestProgressUpdate> {
102 self.progress_updates.read().unwrap().clone()
103 }
104
105 pub fn saved(&self) -> serde_json::Value {
107 self.saved_data.read().unwrap().clone()
108 }
109
110 pub fn set_saved(&self, data: serde_json::Value) -> Result<()> {
112 let mut guard = self.saved_data.write().unwrap();
113 *guard = data;
114 Ok(())
115 }
116
117 pub fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
119 let mut guard = self.saved_data.write().unwrap();
120 if let Some(map) = guard.as_object_mut() {
121 map.insert(key.to_string(), value);
122 } else {
123 let mut map = serde_json::Map::new();
124 map.insert(key.to_string(), value);
125 *guard = serde_json::Value::Object(map);
126 }
127 Ok(())
128 }
129
130 pub fn is_retry(&self) -> bool {
132 self.attempt > 1
133 }
134
135 pub fn is_last_attempt(&self) -> bool {
137 self.attempt >= self.max_attempts
138 }
139
140 pub async fn heartbeat(&self) -> Result<()> {
142 Ok(())
143 }
144
145 pub fn is_cancel_requested(&self) -> Result<bool> {
147 Ok(self.cancel_requested.load(Ordering::SeqCst))
148 }
149
150 pub fn check_cancelled(&self) -> Result<()> {
154 if self.cancel_requested.load(Ordering::SeqCst) {
155 Err(crate::ForgeError::JobCancelled(
156 "Job cancellation requested".to_string(),
157 ))
158 } else {
159 Ok(())
160 }
161 }
162
163 pub fn request_cancellation(&self) {
168 self.cancel_requested.store(true, Ordering::SeqCst);
169 }
170
171 pub fn env_mock(&self) -> &MockEnvProvider {
173 &self.env_provider
174 }
175}
176
177impl EnvAccess for TestJobContext {
178 fn env_provider(&self) -> &dyn EnvProvider {
179 self.env_provider.as_ref()
180 }
181}
182
183pub struct TestJobContextBuilder {
185 job_id: Option<Uuid>,
186 job_type: String,
187 attempt: u32,
188 max_attempts: u32,
189 user_id: Option<Uuid>,
190 roles: Vec<String>,
191 claims: HashMap<String, serde_json::Value>,
192 pool: Option<PgPool>,
193 http: MockHttp,
194 env_vars: HashMap<String, String>,
195 cancel_requested: bool,
196}
197
198impl TestJobContextBuilder {
199 pub fn new(job_type: impl Into<String>) -> Self {
201 Self {
202 job_id: None,
203 job_type: job_type.into(),
204 attempt: 1,
205 max_attempts: 1,
206 user_id: None,
207 roles: Vec::new(),
208 claims: HashMap::new(),
209 pool: None,
210 http: MockHttp::new(),
211 env_vars: HashMap::new(),
212 cancel_requested: false,
213 }
214 }
215
216 pub fn with_job_id(mut self, id: Uuid) -> Self {
218 self.job_id = Some(id);
219 self
220 }
221
222 pub fn as_retry(mut self, attempt: u32) -> Self {
224 self.attempt = attempt.max(1);
225 self
226 }
227
228 pub fn with_max_attempts(mut self, max: u32) -> Self {
230 self.max_attempts = max.max(1);
231 self
232 }
233
234 pub fn as_last_attempt(mut self) -> Self {
236 self.attempt = 3;
237 self.max_attempts = 3;
238 self
239 }
240
241 pub fn as_user(mut self, id: Uuid) -> Self {
243 self.user_id = Some(id);
244 self
245 }
246
247 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
249 self.claims
250 .insert("sub".to_string(), serde_json::json!(subject.into()));
251 self
252 }
253
254 pub fn with_role(mut self, role: impl Into<String>) -> Self {
256 self.roles.push(role.into());
257 self
258 }
259
260 pub fn with_pool(mut self, pool: PgPool) -> Self {
262 self.pool = Some(pool);
263 self
264 }
265
266 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
268 where
269 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
270 {
271 self.http.add_mock_sync(pattern, handler);
272 self
273 }
274
275 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
277 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
278 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
279 }
280
281 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
283 self.env_vars.insert(key.into(), value.into());
284 self
285 }
286
287 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
289 self.env_vars.extend(vars);
290 self
291 }
292
293 pub fn with_cancellation_requested(mut self) -> Self {
297 self.cancel_requested = true;
298 self
299 }
300
301 pub fn build(self) -> TestJobContext {
303 TestJobContext {
304 job_id: self.job_id.unwrap_or_else(Uuid::new_v4),
305 job_type: self.job_type,
306 attempt: self.attempt,
307 max_attempts: self.max_attempts,
308 auth: build_test_auth(self.user_id, self.roles, self.claims),
309 pool: self.pool,
310 http: Arc::new(self.http),
311 progress_updates: Arc::new(RwLock::new(Vec::new())),
312 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
313 saved_data: Arc::new(RwLock::new(crate::job::empty_saved_data())),
314 cancel_requested: Arc::new(AtomicBool::new(self.cancel_requested)),
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[test]
324 fn test_job_context_creation() {
325 let ctx = TestJobContext::builder("export_users").build();
326
327 assert_eq!(ctx.job_type, "export_users");
328 assert_eq!(ctx.attempt, 1);
329 assert!(!ctx.is_retry());
330 assert!(ctx.is_last_attempt()); }
332
333 #[test]
334 fn test_retry_detection() {
335 let ctx = TestJobContext::builder("test")
336 .as_retry(3)
337 .with_max_attempts(5)
338 .build();
339
340 assert!(ctx.is_retry());
341 assert!(!ctx.is_last_attempt());
342 }
343
344 #[test]
345 fn test_last_attempt() {
346 let ctx = TestJobContext::builder("test").as_last_attempt().build();
347
348 assert!(ctx.is_retry());
349 assert!(ctx.is_last_attempt());
350 }
351
352 #[test]
353 fn test_progress_tracking() {
354 let ctx = TestJobContext::builder("test").build();
355
356 ctx.progress(25, "Step 1 complete").unwrap();
357 ctx.progress(50, "Step 2 complete").unwrap();
358 ctx.progress(100, "Done").unwrap();
359
360 let updates = ctx.progress_updates();
361 assert_eq!(updates.len(), 3);
362 assert_eq!(updates[0].percent, 25);
363 assert_eq!(updates[2].percent, 100);
364 }
365
366 #[test]
367 fn test_save_and_saved() {
368 let ctx = TestJobContext::builder("test").build();
369 ctx.save("charge_id", serde_json::json!("ch_123")).unwrap();
370 ctx.save("amount", serde_json::json!(100)).unwrap();
371
372 let saved = ctx.saved();
373 assert_eq!(saved["charge_id"], "ch_123");
374 assert_eq!(saved["amount"], 100);
375 }
376
377 #[test]
378 fn test_cancellation_not_requested() {
379 let ctx = TestJobContext::builder("test").build();
380
381 assert!(!ctx.is_cancel_requested().unwrap());
382 assert!(ctx.check_cancelled().is_ok());
383 }
384
385 #[test]
386 fn test_cancellation_requested_at_build() {
387 let ctx = TestJobContext::builder("test")
388 .with_cancellation_requested()
389 .build();
390
391 assert!(ctx.is_cancel_requested().unwrap());
392 assert!(ctx.check_cancelled().is_err());
393 }
394
395 #[test]
396 fn test_request_cancellation_mid_test() {
397 let ctx = TestJobContext::builder("test").build();
398
399 assert!(!ctx.is_cancel_requested().unwrap());
400 ctx.request_cancellation();
401 assert!(ctx.is_cancel_requested().unwrap());
402 assert!(ctx.check_cancelled().is_err());
403 }
404}