aurora_db/workers/mod.rs
1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45/// storage_path: "./workers.db".to_string(),
46/// concurrency: 10,
47/// poll_interval_ms: 100,
48/// cleanup_interval_seconds: 3600,
49/// };
50///
51/// let workers = WorkerSystem::new(config)?;
52///
53/// // Register job handlers
54/// workers.register_handler("send_email", |job| async move {
55/// let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
56/// let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
57///
58/// // Send email
59/// send_email(to, subject).await?;
60/// Ok(())
61/// }).await;
62///
63/// // Start processing jobs
64/// workers.start().await?;
65///
66/// // Enqueue a job
67/// let job = Job::new("send_email")
68/// .add_field("to", json!("user@example.com"))
69/// .add_field("subject", json!("Welcome!"))
70/// .with_priority(JobPriority::High);
71///
72/// let job_id = workers.enqueue(job).await?;
73/// println!("Enqueued job: {}", job_id);
74/// ```
75pub struct WorkerSystem {
76 queue: Arc<JobQueue>,
77 executor: Arc<RwLock<WorkerExecutor>>,
78}
79
80impl WorkerSystem {
81 /// Create a new worker system
82 ///
83 /// # Arguments
84 /// * `config` - Worker configuration including storage path and concurrency settings
85 ///
86 /// # Examples
87 ///
88
89 /// let config = WorkerConfig {
90 /// storage_path: "./jobs.db".to_string(),
91 /// concurrency: 5,
92 /// poll_interval_ms: 100,
93 /// cleanup_interval_seconds: 3600,
94 /// };
95 ///
96 /// let workers = WorkerSystem::new(config)?;
97 /// ```ignore
98 pub fn new(config: WorkerConfig) -> Result<Self> {
99 let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
100 let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
101
102 Ok(Self { queue, executor })
103 }
104
105 /// Register a job handler
106 pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
107 where
108 F: Fn(Job) -> Fut + Send + Sync + 'static,
109 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
110 {
111 self.executor
112 .read()
113 .await
114 .register_handler(job_type, handler)
115 .await
116 }
117
118 /// Start the worker system
119 ///
120 /// Begins processing jobs from the queue. Jobs are executed concurrently
121 /// based on the `max_concurrent_jobs` configuration.
122 ///
123 /// # Examples
124 ///
125
126 /// let workers = WorkerSystem::new(config)?;
127 ///
128 /// // Register handlers first
129 /// workers.register_handler("task", handler).await;
130 ///
131 /// // Then start processing
132 /// workers.start().await?;
133 /// ```
134 pub async fn start(&self) -> Result<()> {
135 self.executor.write().await.start().await
136 }
137
138 /// Stop the worker system gracefully
139 ///
140 /// Waits for currently running jobs to complete before shutting down.
141 /// No new jobs will be picked up after calling this method.
142 ///
143 /// # Examples
144 ///
145
146 /// // Graceful shutdown
147 /// workers.stop().await?;
148 /// ```ignore
149 pub async fn stop(&self) -> Result<()> {
150 self.executor.write().await.stop().await
151 }
152
153 /// Enqueue a new job for processing
154 ///
155 /// Adds a job to the queue. It will be executed by a worker when available,
156 /// respecting priority and scheduling constraints.
157 ///
158 /// # Arguments
159 /// * `job` - The job to enqueue
160 ///
161 /// # Returns
162 /// The unique job ID for tracking status
163 ///
164 /// # Examples
165 ///
166
167 /// use serde_json::json;
168 ///
169 /// // Simple job
170 /// let job = Job::new("send_welcome_email")
171 /// .add_field("user_id", json!("123"))
172 /// .add_field("email", json!("user@example.com"));
173 ///
174 /// let job_id = workers.enqueue(job).await?;
175 ///
176 /// // High priority job
177 /// let urgent = Job::new("process_payment")
178 /// .add_field("amount", json!(99.99))
179 /// .with_priority(JobPriority::Critical)
180 /// .with_timeout(30); // 30 seconds
181 ///
182 /// workers.enqueue(urgent).await?;
183 ///
184 /// // Scheduled job (runs in 1 hour)
185 /// let scheduled = Job::new("send_reminder")
186 /// .add_field("message", json!("Meeting in 1 hour"))
187 /// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
188 ///
189 /// workers.enqueue(scheduled).await?;
190 /// ```
191 pub async fn enqueue(&self, job: Job) -> Result<String> {
192 self.queue.enqueue(job).await
193 }
194
195 /// Get job status
196 ///
197 /// Retrieves the current status of a job by its ID.
198 ///
199 /// # Arguments
200 /// * `job_id` - The job ID returned from `enqueue()`
201 ///
202 /// # Returns
203 /// - `Some(JobStatus)` if job exists
204 /// - `None` if job not found
205 ///
206 /// # Examples
207 ///
208
209 /// let job_id = workers.enqueue(job).await?;
210 ///
211 /// // Check status later
212 /// if let Some(status) = workers.get_status(&job_id).await? {
213 /// match status {
214 /// JobStatus::Pending => println!("Waiting to run"),
215 /// JobStatus::Running => println!("Currently executing"),
216 /// JobStatus::Completed => println!("Done!"),
217 /// JobStatus::Failed { error, retries } => {
218 /// println!("Failed: {} (retries: {})", error, retries);
219 /// },
220 /// JobStatus::DeadLetter { error } => {
221 /// println!("Permanently failed: {}", error);
222 /// },
223 /// }
224 /// }
225 /// ```ignore
226 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
227 self.queue.get_status(job_id).await
228 }
229
230 /// Get queue statistics
231 ///
232 /// Returns counts of jobs in various states for monitoring.
233 ///
234 /// # Returns
235 /// `QueueStats` with pending, running, completed, failed, and dead letter counts
236 ///
237 /// # Examples
238 ///
239
240 /// let stats = workers.stats().await?;
241 ///
242 /// println!("Queue status:");
243 /// println!(" Pending: {}", stats.pending);
244 /// println!(" Running: {}", stats.running);
245 /// println!(" Completed: {}", stats.completed);
246 /// println!(" Failed: {}", stats.failed);
247 /// println!(" Dead letter: {}", stats.dead_letter);
248 ///
249 /// // Alert on high failure rate
250 /// let total = stats.completed + stats.failed;
251 /// if total > 0 {
252 /// let failure_rate = stats.failed as f64 / total as f64;
253 /// if failure_rate > 0.10 {
254 /// alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
255 /// }
256 /// }
257 /// ```
258 pub async fn stats(&self) -> Result<QueueStats> {
259 self.queue.stats().await
260 }
261}
262
263#[derive(Debug, Clone)]
264pub struct QueueStats {
265 pub pending: usize,
266 pub running: usize,
267 pub completed: usize,
268 pub failed: usize,
269 pub dead_letter: usize,
270}