forge_core/job/
context.rs1use std::sync::mpsc;
2
3use uuid::Uuid;
4
5use crate::function::AuthContext;
6
7pub struct JobContext {
9 pub job_id: Uuid,
11 pub job_type: String,
13 pub attempt: u32,
15 pub max_attempts: u32,
17 pub auth: AuthContext,
19 db_pool: sqlx::PgPool,
21 http_client: reqwest::Client,
23 progress_tx: Option<mpsc::Sender<ProgressUpdate>>,
25}
26
27#[derive(Debug, Clone)]
29pub struct ProgressUpdate {
30 pub job_id: Uuid,
32 pub percentage: u8,
34 pub message: String,
36}
37
38impl JobContext {
39 pub fn new(
41 job_id: Uuid,
42 job_type: String,
43 attempt: u32,
44 max_attempts: u32,
45 db_pool: sqlx::PgPool,
46 http_client: reqwest::Client,
47 ) -> Self {
48 Self {
49 job_id,
50 job_type,
51 attempt,
52 max_attempts,
53 auth: AuthContext::unauthenticated(),
54 db_pool,
55 http_client,
56 progress_tx: None,
57 }
58 }
59
60 pub fn with_auth(mut self, auth: AuthContext) -> Self {
62 self.auth = auth;
63 self
64 }
65
66 pub fn with_progress(mut self, tx: mpsc::Sender<ProgressUpdate>) -> Self {
68 self.progress_tx = Some(tx);
69 self
70 }
71
72 pub fn db(&self) -> &sqlx::PgPool {
74 &self.db_pool
75 }
76
77 pub fn http(&self) -> &reqwest::Client {
79 &self.http_client
80 }
81
82 pub fn progress(&self, percentage: u8, message: impl Into<String>) -> crate::Result<()> {
84 let update = ProgressUpdate {
85 job_id: self.job_id,
86 percentage: percentage.min(100),
87 message: message.into(),
88 };
89
90 if let Some(ref tx) = self.progress_tx {
91 tx.send(update)
92 .map_err(|e| crate::ForgeError::Job(format!("Failed to send progress: {}", e)))?;
93 }
94
95 Ok(())
96 }
97
98 pub async fn heartbeat(&self) -> crate::Result<()> {
100 sqlx::query(
101 r#"
102 UPDATE forge_jobs
103 SET last_heartbeat = NOW()
104 WHERE id = $1
105 "#,
106 )
107 .bind(self.job_id)
108 .execute(&self.db_pool)
109 .await
110 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
111
112 Ok(())
113 }
114
115 pub fn is_retry(&self) -> bool {
117 self.attempt > 1
118 }
119
120 pub fn is_last_attempt(&self) -> bool {
122 self.attempt >= self.max_attempts
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[tokio::test]
131 async fn test_job_context_creation() {
132 let pool = sqlx::postgres::PgPoolOptions::new()
133 .max_connections(1)
134 .connect_lazy("postgres://localhost/nonexistent")
135 .expect("Failed to create mock pool");
136
137 let job_id = Uuid::new_v4();
138 let ctx = JobContext::new(
139 job_id,
140 "test_job".to_string(),
141 1,
142 3,
143 pool,
144 reqwest::Client::new(),
145 );
146
147 assert_eq!(ctx.job_id, job_id);
148 assert_eq!(ctx.job_type, "test_job");
149 assert_eq!(ctx.attempt, 1);
150 assert_eq!(ctx.max_attempts, 3);
151 assert!(!ctx.is_retry());
152 assert!(!ctx.is_last_attempt());
153 }
154
155 #[tokio::test]
156 async fn test_is_retry() {
157 let pool = sqlx::postgres::PgPoolOptions::new()
158 .max_connections(1)
159 .connect_lazy("postgres://localhost/nonexistent")
160 .expect("Failed to create mock pool");
161
162 let ctx = JobContext::new(
163 Uuid::new_v4(),
164 "test".to_string(),
165 2,
166 3,
167 pool,
168 reqwest::Client::new(),
169 );
170
171 assert!(ctx.is_retry());
172 }
173
174 #[tokio::test]
175 async fn test_is_last_attempt() {
176 let pool = sqlx::postgres::PgPoolOptions::new()
177 .max_connections(1)
178 .connect_lazy("postgres://localhost/nonexistent")
179 .expect("Failed to create mock pool");
180
181 let ctx = JobContext::new(
182 Uuid::new_v4(),
183 "test".to_string(),
184 3,
185 3,
186 pool,
187 reqwest::Client::new(),
188 );
189
190 assert!(ctx.is_last_attempt());
191 }
192
193 #[test]
194 fn test_progress_update() {
195 let update = ProgressUpdate {
196 job_id: Uuid::new_v4(),
197 percentage: 50,
198 message: "Halfway there".to_string(),
199 };
200
201 assert_eq!(update.percentage, 50);
202 assert_eq!(update.message, "Halfway there");
203 }
204}