aurora_db/workers/mod.rs
1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45/// storage_path: "./workers.db".to_string(),
46/// concurrency: 10,
47/// poll_interval_ms: 100,
48/// cleanup_interval_seconds: 3600,
49/// };
50///
51/// let workers = WorkerSystem::new(config)?;
52///
53/// // Register job handlers
54/// workers.register_handler("send_email", |job| async move {
55/// let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
56/// let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
57///
58/// // Send email
59/// send_email(to, subject).await?;
60/// Ok(())
61/// }).await;
62///
63/// // Start processing jobs
64/// workers.start().await?;
65///
66/// // Enqueue a job
67/// let job = Job::new("send_email")
68/// .add_field("to", json!("user@example.com"))
69/// .add_field("subject", json!("Welcome!"))
70/// .with_priority(JobPriority::High);
71///
72/// let job_id = workers.enqueue(job).await?;
73/// println!("Enqueued job: {}", job_id);
74/// ```
75pub struct WorkerSystem {
76 queue: Arc<JobQueue>,
77 executor: Arc<RwLock<WorkerExecutor>>,
78}
79
80impl WorkerSystem {
81 /// Create a new worker system
82 ///
83 /// # Arguments
84 /// * `config` - Worker configuration including storage path and concurrency settings
85 ///
86 /// # Examples
87 ///
88
89 /// let config = WorkerConfig {
90 /// storage_path: "./jobs.db".to_string(),
91 /// concurrency: 5,
92 /// poll_interval_ms: 100,
93 /// cleanup_interval_seconds: 3600,
94 /// };
95 ///
96 /// let workers = WorkerSystem::new(config)?;
97 /// ```ignore
98 pub fn new(config: WorkerConfig) -> Result<Self> {
99 let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
100 let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
101
102 Ok(Self { queue, executor })
103 }
104
105 /// Register a job handler
106 pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
107 where
108 F: Fn(Job) -> Fut + Send + Sync + 'static,
109 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
110 {
111 self.executor.read().await.register_handler(job_type, handler).await
112 }
113
114 /// Start the worker system
115 ///
116 /// Begins processing jobs from the queue. Jobs are executed concurrently
117 /// based on the `max_concurrent_jobs` configuration.
118 ///
119 /// # Examples
120 ///
121
122 /// let workers = WorkerSystem::new(config)?;
123 ///
124 /// // Register handlers first
125 /// workers.register_handler("task", handler).await;
126 ///
127 /// // Then start processing
128 /// workers.start().await?;
129 /// ```
130 pub async fn start(&self) -> Result<()> {
131 self.executor.write().await.start().await
132 }
133
134 /// Stop the worker system gracefully
135 ///
136 /// Waits for currently running jobs to complete before shutting down.
137 /// No new jobs will be picked up after calling this method.
138 ///
139 /// # Examples
140 ///
141
142 /// // Graceful shutdown
143 /// workers.stop().await?;
144 /// ```ignore
145 pub async fn stop(&self) -> Result<()> {
146 self.executor.write().await.stop().await
147 }
148
149 /// Enqueue a new job for processing
150 ///
151 /// Adds a job to the queue. It will be executed by a worker when available,
152 /// respecting priority and scheduling constraints.
153 ///
154 /// # Arguments
155 /// * `job` - The job to enqueue
156 ///
157 /// # Returns
158 /// The unique job ID for tracking status
159 ///
160 /// # Examples
161 ///
162
163 /// use serde_json::json;
164 ///
165 /// // Simple job
166 /// let job = Job::new("send_welcome_email")
167 /// .add_field("user_id", json!("123"))
168 /// .add_field("email", json!("user@example.com"));
169 ///
170 /// let job_id = workers.enqueue(job).await?;
171 ///
172 /// // High priority job
173 /// let urgent = Job::new("process_payment")
174 /// .add_field("amount", json!(99.99))
175 /// .with_priority(JobPriority::Critical)
176 /// .with_timeout(30); // 30 seconds
177 ///
178 /// workers.enqueue(urgent).await?;
179 ///
180 /// // Scheduled job (runs in 1 hour)
181 /// let scheduled = Job::new("send_reminder")
182 /// .add_field("message", json!("Meeting in 1 hour"))
183 /// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
184 ///
185 /// workers.enqueue(scheduled).await?;
186 /// ```
187 pub async fn enqueue(&self, job: Job) -> Result<String> {
188 self.queue.enqueue(job).await
189 }
190
191 /// Get job status
192 ///
193 /// Retrieves the current status of a job by its ID.
194 ///
195 /// # Arguments
196 /// * `job_id` - The job ID returned from `enqueue()`
197 ///
198 /// # Returns
199 /// - `Some(JobStatus)` if job exists
200 /// - `None` if job not found
201 ///
202 /// # Examples
203 ///
204
205 /// let job_id = workers.enqueue(job).await?;
206 ///
207 /// // Check status later
208 /// if let Some(status) = workers.get_status(&job_id).await? {
209 /// match status {
210 /// JobStatus::Pending => println!("Waiting to run"),
211 /// JobStatus::Running => println!("Currently executing"),
212 /// JobStatus::Completed => println!("Done!"),
213 /// JobStatus::Failed { error, retries } => {
214 /// println!("Failed: {} (retries: {})", error, retries);
215 /// },
216 /// JobStatus::DeadLetter { error } => {
217 /// println!("Permanently failed: {}", error);
218 /// },
219 /// }
220 /// }
221 /// ```ignore
222 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
223 self.queue.get_status(job_id).await
224 }
225
226 /// Get queue statistics
227 ///
228 /// Returns counts of jobs in various states for monitoring.
229 ///
230 /// # Returns
231 /// `QueueStats` with pending, running, completed, failed, and dead letter counts
232 ///
233 /// # Examples
234 ///
235
236 /// let stats = workers.stats().await?;
237 ///
238 /// println!("Queue status:");
239 /// println!(" Pending: {}", stats.pending);
240 /// println!(" Running: {}", stats.running);
241 /// println!(" Completed: {}", stats.completed);
242 /// println!(" Failed: {}", stats.failed);
243 /// println!(" Dead letter: {}", stats.dead_letter);
244 ///
245 /// // Alert on high failure rate
246 /// let total = stats.completed + stats.failed;
247 /// if total > 0 {
248 /// let failure_rate = stats.failed as f64 / total as f64;
249 /// if failure_rate > 0.10 {
250 /// alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
251 /// }
252 /// }
253 /// ```
254 pub async fn stats(&self) -> Result<QueueStats> {
255 self.queue.stats().await
256 }
257}
258
259#[derive(Debug, Clone)]
260pub struct QueueStats {
261 pub pending: usize,
262 pub running: usize,
263 pub completed: usize,
264 pub failed: usize,
265 pub dead_letter: usize,
266}