#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use sqlx::PgPool;
use uuid::Uuid;
use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
use super::build_test_auth;
use crate::Result;
use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
use crate::function::AuthContext;
#[derive(Debug, Clone)]
pub struct TestProgressUpdate {
pub percent: u8,
pub message: String,
}
pub struct TestJobContext {
pub job_id: Uuid,
pub job_type: String,
pub attempt: u32,
pub max_attempts: u32,
pub auth: AuthContext,
pool: Option<PgPool>,
http: Arc<MockHttp>,
progress_updates: Arc<RwLock<Vec<TestProgressUpdate>>>,
env_provider: Arc<MockEnvProvider>,
saved_data: Arc<RwLock<serde_json::Value>>,
cancel_requested: Arc<AtomicBool>,
}
impl TestJobContext {
pub fn builder(job_type: impl Into<String>) -> TestJobContextBuilder {
TestJobContextBuilder::new(job_type)
}
pub fn db(&self) -> Option<&PgPool> {
self.pool.as_ref()
}
pub fn http(&self) -> &MockHttp {
&self.http
}
pub fn progress(&self, percent: u8, message: impl Into<String>) -> Result<()> {
let update = TestProgressUpdate {
percent: percent.min(100),
message: message.into(),
};
self.progress_updates.write().unwrap().push(update);
Ok(())
}
pub fn progress_updates(&self) -> Vec<TestProgressUpdate> {
self.progress_updates.read().unwrap().clone()
}
pub fn saved(&self) -> serde_json::Value {
self.saved_data.read().unwrap().clone()
}
pub fn set_saved(&self, data: serde_json::Value) -> Result<()> {
let mut guard = self.saved_data.write().unwrap();
*guard = data;
Ok(())
}
pub fn save(&self, key: &str, value: serde_json::Value) -> Result<()> {
let mut guard = self.saved_data.write().unwrap();
if let Some(map) = guard.as_object_mut() {
map.insert(key.to_string(), value);
} else {
let mut map = serde_json::Map::new();
map.insert(key.to_string(), value);
*guard = serde_json::Value::Object(map);
}
Ok(())
}
pub fn is_retry(&self) -> bool {
self.attempt > 1
}
pub fn is_last_attempt(&self) -> bool {
self.attempt >= self.max_attempts
}
pub async fn heartbeat(&self) -> Result<()> {
Ok(())
}
pub fn is_cancel_requested(&self) -> Result<bool> {
Ok(self.cancel_requested.load(Ordering::SeqCst))
}
pub fn check_cancelled(&self) -> Result<()> {
if self.cancel_requested.load(Ordering::SeqCst) {
Err(crate::ForgeError::JobCancelled(
"Job cancellation requested".to_string(),
))
} else {
Ok(())
}
}
pub fn request_cancellation(&self) {
self.cancel_requested.store(true, Ordering::SeqCst);
}
pub fn env_mock(&self) -> &MockEnvProvider {
&self.env_provider
}
}
impl EnvAccess for TestJobContext {
fn env_provider(&self) -> &dyn EnvProvider {
self.env_provider.as_ref()
}
}
pub struct TestJobContextBuilder {
job_id: Option<Uuid>,
job_type: String,
attempt: u32,
max_attempts: u32,
user_id: Option<Uuid>,
roles: Vec<String>,
claims: HashMap<String, serde_json::Value>,
pool: Option<PgPool>,
http: MockHttp,
env_vars: HashMap<String, String>,
cancel_requested: bool,
}
impl TestJobContextBuilder {
pub fn new(job_type: impl Into<String>) -> Self {
Self {
job_id: None,
job_type: job_type.into(),
attempt: 1,
max_attempts: 1,
user_id: None,
roles: Vec::new(),
claims: HashMap::new(),
pool: None,
http: MockHttp::new(),
env_vars: HashMap::new(),
cancel_requested: false,
}
}
pub fn with_job_id(mut self, id: Uuid) -> Self {
self.job_id = Some(id);
self
}
pub fn as_retry(mut self, attempt: u32) -> Self {
self.attempt = attempt.max(1);
self
}
pub fn with_max_attempts(mut self, max: u32) -> Self {
self.max_attempts = max.max(1);
self
}
pub fn as_last_attempt(mut self) -> Self {
self.attempt = 3;
self.max_attempts = 3;
self
}
pub fn as_user(mut self, id: Uuid) -> Self {
self.user_id = Some(id);
self
}
pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
self.claims
.insert("sub".to_string(), serde_json::json!(subject.into()));
self
}
pub fn with_role(mut self, role: impl Into<String>) -> Self {
self.roles.push(role.into());
self
}
pub fn with_pool(mut self, pool: PgPool) -> Self {
self.pool = Some(pool);
self
}
pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
where
F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
{
self.http.add_mock_sync(pattern, handler);
self
}
pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
}
pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env_vars.insert(key.into(), value.into());
self
}
pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
self.env_vars.extend(vars);
self
}
pub fn with_cancellation_requested(mut self) -> Self {
self.cancel_requested = true;
self
}
pub fn build(self) -> TestJobContext {
TestJobContext {
job_id: self.job_id.unwrap_or_else(Uuid::new_v4),
job_type: self.job_type,
attempt: self.attempt,
max_attempts: self.max_attempts,
auth: build_test_auth(self.user_id, self.roles, self.claims),
pool: self.pool,
http: Arc::new(self.http),
progress_updates: Arc::new(RwLock::new(Vec::new())),
env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
saved_data: Arc::new(RwLock::new(crate::job::empty_saved_data())),
cancel_requested: Arc::new(AtomicBool::new(self.cancel_requested)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_context_creation() {
let ctx = TestJobContext::builder("export_users").build();
assert_eq!(ctx.job_type, "export_users");
assert_eq!(ctx.attempt, 1);
assert!(!ctx.is_retry());
assert!(ctx.is_last_attempt()); }
#[test]
fn test_retry_detection() {
let ctx = TestJobContext::builder("test")
.as_retry(3)
.with_max_attempts(5)
.build();
assert!(ctx.is_retry());
assert!(!ctx.is_last_attempt());
}
#[test]
fn test_last_attempt() {
let ctx = TestJobContext::builder("test").as_last_attempt().build();
assert!(ctx.is_retry());
assert!(ctx.is_last_attempt());
}
#[test]
fn test_progress_tracking() {
let ctx = TestJobContext::builder("test").build();
ctx.progress(25, "Step 1 complete").unwrap();
ctx.progress(50, "Step 2 complete").unwrap();
ctx.progress(100, "Done").unwrap();
let updates = ctx.progress_updates();
assert_eq!(updates.len(), 3);
assert_eq!(updates[0].percent, 25);
assert_eq!(updates[2].percent, 100);
}
#[test]
fn test_save_and_saved() {
let ctx = TestJobContext::builder("test").build();
ctx.save("charge_id", serde_json::json!("ch_123")).unwrap();
ctx.save("amount", serde_json::json!(100)).unwrap();
let saved = ctx.saved();
assert_eq!(saved["charge_id"], "ch_123");
assert_eq!(saved["amount"], 100);
}
#[test]
fn test_cancellation_not_requested() {
let ctx = TestJobContext::builder("test").build();
assert!(!ctx.is_cancel_requested().unwrap());
assert!(ctx.check_cancelled().is_ok());
}
#[test]
fn test_cancellation_requested_at_build() {
let ctx = TestJobContext::builder("test")
.with_cancellation_requested()
.build();
assert!(ctx.is_cancel_requested().unwrap());
assert!(ctx.check_cancelled().is_err());
}
#[test]
fn test_request_cancellation_mid_test() {
let ctx = TestJobContext::builder("test").build();
assert!(!ctx.is_cancel_requested().unwrap());
ctx.request_cancellation();
assert!(ctx.is_cancel_requested().unwrap());
assert!(ctx.check_cancelled().is_err());
}
}