Skip to main content

qml_rs/processing/
worker.rs

1//! Worker types and configuration
2//!
3//! This module contains the types used for job execution context and results.
4
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use tokio_util::sync::CancellationToken;
9
10/// Configuration for worker instances
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkerConfig {
13    /// Unique identifier for the worker
14    pub worker_id: String,
15    /// Name of the server where the worker is running
16    pub server_name: String,
17    /// Queues that this worker will process
18    pub queues: Vec<String>,
19    /// Timeout for job execution
20    pub job_timeout: Duration,
21    /// Polling interval for checking new jobs
22    pub polling_interval: Duration,
23}
24
25impl Default for WorkerConfig {
26    fn default() -> Self {
27        Self {
28            worker_id: uuid::Uuid::new_v4().to_string(),
29            server_name: "default".to_string(),
30            queues: vec!["default".to_string()],
31            job_timeout: Duration::minutes(5),
32            polling_interval: Duration::seconds(1),
33        }
34    }
35}
36
37impl WorkerConfig {
38    /// Create a new worker configuration with the specified worker ID
39    pub fn new(worker_id: impl Into<String>) -> Self {
40        Self {
41            worker_id: worker_id.into(),
42            ..Default::default()
43        }
44    }
45
46    /// Set the server name
47    pub fn server_name(mut self, server_name: impl Into<String>) -> Self {
48        self.server_name = server_name.into();
49        self
50    }
51
52    /// Set the queues this worker will process
53    pub fn queues(mut self, queues: Vec<String>) -> Self {
54        self.queues = queues;
55        self
56    }
57
58    /// Set the job timeout
59    pub fn job_timeout(mut self, timeout: Duration) -> Self {
60        self.job_timeout = timeout;
61        self
62    }
63
64    /// Set the polling interval
65    pub fn polling_interval(mut self, interval: Duration) -> Self {
66        self.polling_interval = interval;
67        self
68    }
69}
70
71/// Context information provided to workers during job execution
72#[derive(Debug, Clone)]
73pub struct WorkerContext {
74    /// Worker configuration
75    pub config: WorkerConfig,
76    /// When the job execution started
77    pub started_at: DateTime<Utc>,
78    /// Metadata for the current execution
79    pub execution_metadata: HashMap<String, String>,
80    /// Attempt number (for retries)
81    pub attempt: u32,
82    /// Previous exception if this is a retry
83    pub previous_exception: Option<String>,
84    /// Cancellation token for cooperative shutdown. A long-running worker
85    /// impl can race its work against this token to drop out cleanly when
86    /// the server is asked to stop:
87    ///
88    /// ```ignore
89    /// tokio::select! {
90    ///     _ = ctx.cancel.cancelled() => Ok(WorkerResult::retry(
91    ///         "shutting down".into(),
92    ///         None,
93    ///     )),
94    ///     res = do_expensive_work() => res,
95    /// }
96    /// ```
97    ///
98    /// The token is a child of the `BackgroundJobServer` shutdown token, so
99    /// calling `server.stop()` flips every context in flight.
100    pub cancel: CancellationToken,
101}
102
103impl WorkerContext {
104    /// Create a new worker context with a detached cancellation token. The
105    /// server installs a real, shutdown-linked token via
106    /// `WorkerContext::with_cancel`.
107    pub fn new(config: WorkerConfig) -> Self {
108        Self {
109            config,
110            started_at: Utc::now(),
111            execution_metadata: HashMap::new(),
112            attempt: 1,
113            previous_exception: None,
114            cancel: CancellationToken::new(),
115        }
116    }
117
118    /// Create a retry context from a previous attempt
119    pub fn retry_from(
120        config: WorkerConfig,
121        attempt: u32,
122        previous_exception: Option<String>,
123    ) -> Self {
124        Self {
125            config,
126            started_at: Utc::now(),
127            execution_metadata: HashMap::new(),
128            attempt,
129            previous_exception,
130            cancel: CancellationToken::new(),
131        }
132    }
133
134    /// Override the cancellation token. Builder-style so the server can
135    /// install the shutdown-linked child token.
136    pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
137        self.cancel = cancel;
138        self
139    }
140
141    /// Add execution metadata
142    pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
143        self.execution_metadata.insert(key.into(), value.into());
144    }
145
146    /// Get execution duration so far
147    pub fn duration(&self) -> Duration {
148        Utc::now() - self.started_at
149    }
150
151    /// Check if the job has timed out
152    pub fn is_timed_out(&self) -> bool {
153        self.duration() > self.config.job_timeout
154    }
155
156    /// Check if this is a retry attempt
157    pub fn is_retry(&self) -> bool {
158        self.attempt > 1
159    }
160}
161
162/// Result of job execution
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub enum WorkerResult {
165    /// Job completed successfully
166    Success {
167        /// Optional result data
168        result: Option<String>,
169        /// Execution duration in milliseconds
170        duration_ms: u64,
171        /// Metadata about the execution
172        metadata: HashMap<String, String>,
173    },
174    /// Job failed and should be retried
175    Retry {
176        /// Error message
177        error: String,
178        /// Stack trace if available
179        stack_trace: Option<String>,
180        /// When to retry (None for immediate retry)
181        retry_at: Option<DateTime<Utc>>,
182        /// Additional context about the failure
183        context: HashMap<String, String>,
184    },
185    /// Job failed permanently (no retry)
186    Failure {
187        /// Error message
188        error: String,
189        /// Stack trace if available
190        stack_trace: Option<String>,
191        /// Additional context about the failure
192        context: HashMap<String, String>,
193    },
194}
195
196impl WorkerResult {
197    /// Create a successful result
198    pub fn success(result: Option<String>, duration_ms: u64) -> Self {
199        Self::Success {
200            result,
201            duration_ms,
202            metadata: HashMap::new(),
203        }
204    }
205
206    /// Create a successful result with metadata
207    pub fn success_with_metadata(
208        result: Option<String>,
209        duration_ms: u64,
210        metadata: HashMap<String, String>,
211    ) -> Self {
212        Self::Success {
213            result,
214            duration_ms,
215            metadata,
216        }
217    }
218
219    /// Create a retry result
220    pub fn retry(error: String, retry_at: Option<DateTime<Utc>>) -> Self {
221        Self::Retry {
222            error,
223            stack_trace: None,
224            retry_at,
225            context: HashMap::new(),
226        }
227    }
228
229    /// Create a retry result with context
230    pub fn retry_with_context(
231        error: String,
232        retry_at: Option<DateTime<Utc>>,
233        context: HashMap<String, String>,
234    ) -> Self {
235        Self::Retry {
236            error,
237            stack_trace: None,
238            retry_at,
239            context,
240        }
241    }
242
243    /// Create a permanent failure result
244    pub fn failure(error: String) -> Self {
245        Self::Failure {
246            error,
247            stack_trace: None,
248            context: HashMap::new(),
249        }
250    }
251
252    /// Create a permanent failure result with context
253    pub fn failure_with_context(error: String, context: HashMap<String, String>) -> Self {
254        Self::Failure {
255            error,
256            stack_trace: None,
257            context,
258        }
259    }
260
261    /// Check if the result indicates success
262    pub fn is_success(&self) -> bool {
263        matches!(self, WorkerResult::Success { .. })
264    }
265
266    /// Check if the result indicates a retry should be attempted
267    pub fn should_retry(&self) -> bool {
268        matches!(self, WorkerResult::Retry { .. })
269    }
270
271    /// Check if the result indicates permanent failure
272    pub fn is_failure(&self) -> bool {
273        matches!(self, WorkerResult::Failure { .. })
274    }
275
276    /// Get the error message if this is an error result
277    pub fn error_message(&self) -> Option<&str> {
278        match self {
279            WorkerResult::Retry { error, .. } | WorkerResult::Failure { error, .. } => Some(error),
280            WorkerResult::Success { .. } => None,
281        }
282    }
283}