Skip to main content

forge_core/job/
context.rs

1use std::sync::{Arc, mpsc};
2
3use uuid::Uuid;
4
5use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
6use crate::function::AuthContext;
7
8/// Context available to job handlers.
9pub struct JobContext {
10    /// Job ID.
11    pub job_id: Uuid,
12    /// Job type/name.
13    pub job_type: String,
14    /// Current attempt number (1-based).
15    pub attempt: u32,
16    /// Maximum attempts allowed.
17    pub max_attempts: u32,
18    /// Authentication context (for queries/mutations).
19    pub auth: AuthContext,
20    /// Database pool.
21    db_pool: sqlx::PgPool,
22    /// HTTP client for external calls.
23    http_client: reqwest::Client,
24    /// Progress reporter (sync channel for simplicity).
25    progress_tx: Option<mpsc::Sender<ProgressUpdate>>,
26    /// Environment variable provider.
27    env_provider: Arc<dyn EnvProvider>,
28}
29
30/// Progress update message.
31#[derive(Debug, Clone)]
32pub struct ProgressUpdate {
33    /// Job ID.
34    pub job_id: Uuid,
35    /// Progress percentage (0-100).
36    pub percentage: u8,
37    /// Status message.
38    pub message: String,
39}
40
41impl JobContext {
42    /// Create a new job context.
43    pub fn new(
44        job_id: Uuid,
45        job_type: String,
46        attempt: u32,
47        max_attempts: u32,
48        db_pool: sqlx::PgPool,
49        http_client: reqwest::Client,
50    ) -> Self {
51        Self {
52            job_id,
53            job_type,
54            attempt,
55            max_attempts,
56            auth: AuthContext::unauthenticated(),
57            db_pool,
58            http_client,
59            progress_tx: None,
60            env_provider: Arc::new(RealEnvProvider::new()),
61        }
62    }
63
64    /// Set authentication context.
65    pub fn with_auth(mut self, auth: AuthContext) -> Self {
66        self.auth = auth;
67        self
68    }
69
70    /// Set progress channel.
71    pub fn with_progress(mut self, tx: mpsc::Sender<ProgressUpdate>) -> Self {
72        self.progress_tx = Some(tx);
73        self
74    }
75
76    /// Set environment provider.
77    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
78        self.env_provider = provider;
79        self
80    }
81
82    /// Get database pool.
83    pub fn db(&self) -> &sqlx::PgPool {
84        &self.db_pool
85    }
86
87    /// Get HTTP client.
88    pub fn http(&self) -> &reqwest::Client {
89        &self.http_client
90    }
91
92    /// Report job progress.
93    pub fn progress(&self, percentage: u8, message: impl Into<String>) -> crate::Result<()> {
94        let update = ProgressUpdate {
95            job_id: self.job_id,
96            percentage: percentage.min(100),
97            message: message.into(),
98        };
99
100        if let Some(ref tx) = self.progress_tx {
101            tx.send(update)
102                .map_err(|e| crate::ForgeError::Job(format!("Failed to send progress: {}", e)))?;
103        }
104
105        Ok(())
106    }
107
108    /// Send heartbeat to keep job alive (async).
109    pub async fn heartbeat(&self) -> crate::Result<()> {
110        sqlx::query(
111            r#"
112            UPDATE forge_jobs
113            SET last_heartbeat = NOW()
114            WHERE id = $1
115            "#,
116        )
117        .bind(self.job_id)
118        .execute(&self.db_pool)
119        .await
120        .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
121
122        Ok(())
123    }
124
125    /// Check if this is a retry attempt.
126    pub fn is_retry(&self) -> bool {
127        self.attempt > 1
128    }
129
130    /// Check if this is the last attempt.
131    pub fn is_last_attempt(&self) -> bool {
132        self.attempt >= self.max_attempts
133    }
134}
135
136impl EnvAccess for JobContext {
137    fn env_provider(&self) -> &dyn EnvProvider {
138        self.env_provider.as_ref()
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[tokio::test]
147    async fn test_job_context_creation() {
148        let pool = sqlx::postgres::PgPoolOptions::new()
149            .max_connections(1)
150            .connect_lazy("postgres://localhost/nonexistent")
151            .expect("Failed to create mock pool");
152
153        let job_id = Uuid::new_v4();
154        let ctx = JobContext::new(
155            job_id,
156            "test_job".to_string(),
157            1,
158            3,
159            pool,
160            reqwest::Client::new(),
161        );
162
163        assert_eq!(ctx.job_id, job_id);
164        assert_eq!(ctx.job_type, "test_job");
165        assert_eq!(ctx.attempt, 1);
166        assert_eq!(ctx.max_attempts, 3);
167        assert!(!ctx.is_retry());
168        assert!(!ctx.is_last_attempt());
169    }
170
171    #[tokio::test]
172    async fn test_is_retry() {
173        let pool = sqlx::postgres::PgPoolOptions::new()
174            .max_connections(1)
175            .connect_lazy("postgres://localhost/nonexistent")
176            .expect("Failed to create mock pool");
177
178        let ctx = JobContext::new(
179            Uuid::new_v4(),
180            "test".to_string(),
181            2,
182            3,
183            pool,
184            reqwest::Client::new(),
185        );
186
187        assert!(ctx.is_retry());
188    }
189
190    #[tokio::test]
191    async fn test_is_last_attempt() {
192        let pool = sqlx::postgres::PgPoolOptions::new()
193            .max_connections(1)
194            .connect_lazy("postgres://localhost/nonexistent")
195            .expect("Failed to create mock pool");
196
197        let ctx = JobContext::new(
198            Uuid::new_v4(),
199            "test".to_string(),
200            3,
201            3,
202            pool,
203            reqwest::Client::new(),
204        );
205
206        assert!(ctx.is_last_attempt());
207    }
208
209    #[test]
210    fn test_progress_update() {
211        let update = ProgressUpdate {
212            job_id: Uuid::new_v4(),
213            percentage: 50,
214            message: "Halfway there".to_string(),
215        };
216
217        assert_eq!(update.percentage, 50);
218        assert_eq!(update.message, "Halfway there");
219    }
220}