qml_rs/processing/
worker.rs

1//! Worker types and configuration
2//!
3//! This module contains the types used for job execution context and results.
4
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Configuration for worker instances
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct WorkerConfig {
12    /// Unique identifier for the worker
13    pub worker_id: String,
14    /// Name of the server where the worker is running
15    pub server_name: String,
16    /// Queues that this worker will process
17    pub queues: Vec<String>,
18    /// Maximum number of concurrent jobs this worker can handle
19    pub concurrency: usize,
20    /// Timeout for job execution
21    pub job_timeout: Duration,
22    /// Polling interval for checking new jobs
23    pub polling_interval: Duration,
24    /// Whether to automatically retry failed jobs
25    pub auto_retry: bool,
26    /// Maximum number of retry attempts
27    pub max_retries: u32,
28}
29
30impl Default for WorkerConfig {
31    fn default() -> Self {
32        Self {
33            worker_id: uuid::Uuid::new_v4().to_string(),
34            server_name: "default".to_string(),
35            queues: vec!["default".to_string()],
36            concurrency: 5,
37            job_timeout: Duration::minutes(5),
38            polling_interval: Duration::seconds(1),
39            auto_retry: true,
40            max_retries: 3,
41        }
42    }
43}
44
45impl WorkerConfig {
46    /// Create a new worker configuration with the specified worker ID
47    pub fn new(worker_id: impl Into<String>) -> Self {
48        Self {
49            worker_id: worker_id.into(),
50            ..Default::default()
51        }
52    }
53
54    /// Set the server name
55    pub fn server_name(mut self, server_name: impl Into<String>) -> Self {
56        self.server_name = server_name.into();
57        self
58    }
59
60    /// Set the queues this worker will process
61    pub fn queues(mut self, queues: Vec<String>) -> Self {
62        self.queues = queues;
63        self
64    }
65
66    /// Set the concurrency level
67    pub fn concurrency(mut self, concurrency: usize) -> Self {
68        self.concurrency = concurrency;
69        self
70    }
71
72    /// Set the job timeout
73    pub fn job_timeout(mut self, timeout: Duration) -> Self {
74        self.job_timeout = timeout;
75        self
76    }
77
78    /// Set the polling interval
79    pub fn polling_interval(mut self, interval: Duration) -> Self {
80        self.polling_interval = interval;
81        self
82    }
83
84    /// Set auto retry behavior
85    pub fn auto_retry(mut self, auto_retry: bool) -> Self {
86        self.auto_retry = auto_retry;
87        self
88    }
89
90    /// Set maximum retry attempts
91    pub fn max_retries(mut self, max_retries: u32) -> Self {
92        self.max_retries = max_retries;
93        self
94    }
95}
96
97/// Context information provided to workers during job execution
98#[derive(Debug, Clone)]
99pub struct WorkerContext {
100    /// Worker configuration
101    pub config: WorkerConfig,
102    /// When the job execution started
103    pub started_at: DateTime<Utc>,
104    /// Metadata for the current execution
105    pub execution_metadata: HashMap<String, String>,
106    /// Attempt number (for retries)
107    pub attempt: u32,
108    /// Previous exception if this is a retry
109    pub previous_exception: Option<String>,
110}
111
112impl WorkerContext {
113    /// Create a new worker context
114    pub fn new(config: WorkerConfig) -> Self {
115        Self {
116            config,
117            started_at: Utc::now(),
118            execution_metadata: HashMap::new(),
119            attempt: 1,
120            previous_exception: None,
121        }
122    }
123
124    /// Create a retry context from a previous attempt
125    pub fn retry_from(
126        config: WorkerConfig,
127        attempt: u32,
128        previous_exception: Option<String>,
129    ) -> Self {
130        Self {
131            config,
132            started_at: Utc::now(),
133            execution_metadata: HashMap::new(),
134            attempt,
135            previous_exception,
136        }
137    }
138
139    /// Add execution metadata
140    pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
141        self.execution_metadata.insert(key.into(), value.into());
142    }
143
144    /// Get execution duration so far
145    pub fn duration(&self) -> Duration {
146        Utc::now() - self.started_at
147    }
148
149    /// Check if the job has timed out
150    pub fn is_timed_out(&self) -> bool {
151        self.duration() > self.config.job_timeout
152    }
153
154    /// Check if this is a retry attempt
155    pub fn is_retry(&self) -> bool {
156        self.attempt > 1
157    }
158}
159
160/// Result of job execution
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum WorkerResult {
163    /// Job completed successfully
164    Success {
165        /// Optional result data
166        result: Option<String>,
167        /// Execution duration in milliseconds
168        duration_ms: u64,
169        /// Metadata about the execution
170        metadata: HashMap<String, String>,
171    },
172    /// Job failed and should be retried
173    Retry {
174        /// Error message
175        error: String,
176        /// Stack trace if available
177        stack_trace: Option<String>,
178        /// When to retry (None for immediate retry)
179        retry_at: Option<DateTime<Utc>>,
180        /// Additional context about the failure
181        context: HashMap<String, String>,
182    },
183    /// Job failed permanently (no retry)
184    Failure {
185        /// Error message
186        error: String,
187        /// Stack trace if available
188        stack_trace: Option<String>,
189        /// Additional context about the failure
190        context: HashMap<String, String>,
191    },
192}
193
194impl WorkerResult {
195    /// Create a successful result
196    pub fn success(result: Option<String>, duration_ms: u64) -> Self {
197        Self::Success {
198            result,
199            duration_ms,
200            metadata: HashMap::new(),
201        }
202    }
203
204    /// Create a successful result with metadata
205    pub fn success_with_metadata(
206        result: Option<String>,
207        duration_ms: u64,
208        metadata: HashMap<String, String>,
209    ) -> Self {
210        Self::Success {
211            result,
212            duration_ms,
213            metadata,
214        }
215    }
216
217    /// Create a retry result
218    pub fn retry(error: String, retry_at: Option<DateTime<Utc>>) -> Self {
219        Self::Retry {
220            error,
221            stack_trace: None,
222            retry_at,
223            context: HashMap::new(),
224        }
225    }
226
227    /// Create a retry result with context
228    pub fn retry_with_context(
229        error: String,
230        retry_at: Option<DateTime<Utc>>,
231        context: HashMap<String, String>,
232    ) -> Self {
233        Self::Retry {
234            error,
235            stack_trace: None,
236            retry_at,
237            context,
238        }
239    }
240
241    /// Create a permanent failure result
242    pub fn failure(error: String) -> Self {
243        Self::Failure {
244            error,
245            stack_trace: None,
246            context: HashMap::new(),
247        }
248    }
249
250    /// Create a permanent failure result with context
251    pub fn failure_with_context(error: String, context: HashMap<String, String>) -> Self {
252        Self::Failure {
253            error,
254            stack_trace: None,
255            context,
256        }
257    }
258
259    /// Check if the result indicates success
260    pub fn is_success(&self) -> bool {
261        matches!(self, WorkerResult::Success { .. })
262    }
263
264    /// Check if the result indicates a retry should be attempted
265    pub fn should_retry(&self) -> bool {
266        matches!(self, WorkerResult::Retry { .. })
267    }
268
269    /// Check if the result indicates permanent failure
270    pub fn is_failure(&self) -> bool {
271        matches!(self, WorkerResult::Failure { .. })
272    }
273
274    /// Get the error message if this is an error result
275    pub fn error_message(&self) -> Option<&str> {
276        match self {
277            WorkerResult::Retry { error, .. } | WorkerResult::Failure { error, .. } => Some(error),
278            WorkerResult::Success { .. } => None,
279        }
280    }
281}