forge_core/job/
context.rs

1use std::sync::mpsc;
2
3use uuid::Uuid;
4
5use crate::function::AuthContext;
6
7/// Context available to job handlers.
8pub struct JobContext {
9    /// Job ID.
10    pub job_id: Uuid,
11    /// Job type/name.
12    pub job_type: String,
13    /// Current attempt number (1-based).
14    pub attempt: u32,
15    /// Maximum attempts allowed.
16    pub max_attempts: u32,
17    /// Authentication context (for queries/mutations).
18    pub auth: AuthContext,
19    /// Database pool.
20    db_pool: sqlx::PgPool,
21    /// HTTP client for external calls.
22    http_client: reqwest::Client,
23    /// Progress reporter (sync channel for simplicity).
24    progress_tx: Option<mpsc::Sender<ProgressUpdate>>,
25}
26
27/// Progress update message.
28#[derive(Debug, Clone)]
29pub struct ProgressUpdate {
30    /// Job ID.
31    pub job_id: Uuid,
32    /// Progress percentage (0-100).
33    pub percentage: u8,
34    /// Status message.
35    pub message: String,
36}
37
38impl JobContext {
39    /// Create a new job context.
40    pub fn new(
41        job_id: Uuid,
42        job_type: String,
43        attempt: u32,
44        max_attempts: u32,
45        db_pool: sqlx::PgPool,
46        http_client: reqwest::Client,
47    ) -> Self {
48        Self {
49            job_id,
50            job_type,
51            attempt,
52            max_attempts,
53            auth: AuthContext::unauthenticated(),
54            db_pool,
55            http_client,
56            progress_tx: None,
57        }
58    }
59
60    /// Set authentication context.
61    pub fn with_auth(mut self, auth: AuthContext) -> Self {
62        self.auth = auth;
63        self
64    }
65
66    /// Set progress channel.
67    pub fn with_progress(mut self, tx: mpsc::Sender<ProgressUpdate>) -> Self {
68        self.progress_tx = Some(tx);
69        self
70    }
71
72    /// Get database pool.
73    pub fn db(&self) -> &sqlx::PgPool {
74        &self.db_pool
75    }
76
77    /// Get HTTP client.
78    pub fn http(&self) -> &reqwest::Client {
79        &self.http_client
80    }
81
82    /// Report job progress.
83    pub fn progress(&self, percentage: u8, message: impl Into<String>) -> crate::Result<()> {
84        let update = ProgressUpdate {
85            job_id: self.job_id,
86            percentage: percentage.min(100),
87            message: message.into(),
88        };
89
90        if let Some(ref tx) = self.progress_tx {
91            tx.send(update)
92                .map_err(|e| crate::ForgeError::Job(format!("Failed to send progress: {}", e)))?;
93        }
94
95        Ok(())
96    }
97
98    /// Send heartbeat to keep job alive (async).
99    pub async fn heartbeat(&self) -> crate::Result<()> {
100        sqlx::query(
101            r#"
102            UPDATE forge_jobs
103            SET last_heartbeat = NOW()
104            WHERE id = $1
105            "#,
106        )
107        .bind(self.job_id)
108        .execute(&self.db_pool)
109        .await
110        .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
111
112        Ok(())
113    }
114
115    /// Check if this is a retry attempt.
116    pub fn is_retry(&self) -> bool {
117        self.attempt > 1
118    }
119
120    /// Check if this is the last attempt.
121    pub fn is_last_attempt(&self) -> bool {
122        self.attempt >= self.max_attempts
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[tokio::test]
131    async fn test_job_context_creation() {
132        let pool = sqlx::postgres::PgPoolOptions::new()
133            .max_connections(1)
134            .connect_lazy("postgres://localhost/nonexistent")
135            .expect("Failed to create mock pool");
136
137        let job_id = Uuid::new_v4();
138        let ctx = JobContext::new(
139            job_id,
140            "test_job".to_string(),
141            1,
142            3,
143            pool,
144            reqwest::Client::new(),
145        );
146
147        assert_eq!(ctx.job_id, job_id);
148        assert_eq!(ctx.job_type, "test_job");
149        assert_eq!(ctx.attempt, 1);
150        assert_eq!(ctx.max_attempts, 3);
151        assert!(!ctx.is_retry());
152        assert!(!ctx.is_last_attempt());
153    }
154
155    #[tokio::test]
156    async fn test_is_retry() {
157        let pool = sqlx::postgres::PgPoolOptions::new()
158            .max_connections(1)
159            .connect_lazy("postgres://localhost/nonexistent")
160            .expect("Failed to create mock pool");
161
162        let ctx = JobContext::new(
163            Uuid::new_v4(),
164            "test".to_string(),
165            2,
166            3,
167            pool,
168            reqwest::Client::new(),
169        );
170
171        assert!(ctx.is_retry());
172    }
173
174    #[tokio::test]
175    async fn test_is_last_attempt() {
176        let pool = sqlx::postgres::PgPoolOptions::new()
177            .max_connections(1)
178            .connect_lazy("postgres://localhost/nonexistent")
179            .expect("Failed to create mock pool");
180
181        let ctx = JobContext::new(
182            Uuid::new_v4(),
183            "test".to_string(),
184            3,
185            3,
186            pool,
187            reqwest::Client::new(),
188        );
189
190        assert!(ctx.is_last_attempt());
191    }
192
193    #[test]
194    fn test_progress_update() {
195        let update = ProgressUpdate {
196            job_id: Uuid::new_v4(),
197            percentage: 50,
198            message: "Halfway there".to_string(),
199        };
200
201        assert_eq!(update.percentage, 50);
202        assert_eq!(update.message, "Halfway there");
203    }
204}