aurora_db/workers/mod.rs
1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45/// storage_path: "./workers.db".to_string(),
46/// max_concurrent_jobs: 10,
47/// poll_interval_ms: 100,
48/// };
49///
50/// let workers = WorkerSystem::new(config)?;
51///
52/// // Register job handlers
53/// workers.register_handler("send_email", |job| async move {
54/// let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
55/// let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
56///
57/// // Send email
58/// send_email(to, subject).await?;
59/// Ok(())
60/// }).await;
61///
62/// // Start processing jobs
63/// workers.start().await?;
64///
65/// // Enqueue a job
66/// let job = Job::new("send_email")
67/// .add_field("to", json!("user@example.com"))
68/// .add_field("subject", json!("Welcome!"))
69/// .with_priority(JobPriority::High);
70///
71/// let job_id = workers.enqueue(job).await?;
72/// println!("Enqueued job: {}", job_id);
73/// ```
74pub struct WorkerSystem {
75 queue: Arc<JobQueue>,
76 executor: Arc<RwLock<WorkerExecutor>>,
77}
78
79impl WorkerSystem {
80 /// Create a new worker system
81 ///
82 /// # Arguments
83 /// * `config` - Worker configuration including storage path and concurrency settings
84 ///
85 /// # Examples
86 ///
87
88 /// let config = WorkerConfig {
89 /// storage_path: "./jobs.db".to_string(),
90 /// max_concurrent_jobs: 5,
91 /// poll_interval_ms: 100,
92 /// };
93 ///
94 /// let workers = WorkerSystem::new(config)?;
95 /// ```ignore
96 pub fn new(config: WorkerConfig) -> Result<Self> {
97 let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
98 let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
99
100 Ok(Self { queue, executor })
101 }
102
103 /// Register a job handler
104 pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
105 where
106 F: Fn(Job) -> Fut + Send + Sync + 'static,
107 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
108 {
109 self.executor.read().await.register_handler(job_type, handler).await
110 }
111
112 /// Start the worker system
113 ///
114 /// Begins processing jobs from the queue. Jobs are executed concurrently
115 /// based on the `max_concurrent_jobs` configuration.
116 ///
117 /// # Examples
118 ///
119
120 /// let workers = WorkerSystem::new(config)?;
121 ///
122 /// // Register handlers first
123 /// workers.register_handler("task", handler).await;
124 ///
125 /// // Then start processing
126 /// workers.start().await?;
127 /// ```
128 pub async fn start(&self) -> Result<()> {
129 self.executor.write().await.start().await
130 }
131
132 /// Stop the worker system gracefully
133 ///
134 /// Waits for currently running jobs to complete before shutting down.
135 /// No new jobs will be picked up after calling this method.
136 ///
137 /// # Examples
138 ///
139
140 /// // Graceful shutdown
141 /// workers.stop().await?;
142 /// ```ignore
143 pub async fn stop(&self) -> Result<()> {
144 self.executor.write().await.stop().await
145 }
146
147 /// Enqueue a new job for processing
148 ///
149 /// Adds a job to the queue. It will be executed by a worker when available,
150 /// respecting priority and scheduling constraints.
151 ///
152 /// # Arguments
153 /// * `job` - The job to enqueue
154 ///
155 /// # Returns
156 /// The unique job ID for tracking status
157 ///
158 /// # Examples
159 ///
160
161 /// use serde_json::json;
162 ///
163 /// // Simple job
164 /// let job = Job::new("send_welcome_email")
165 /// .add_field("user_id", json!("123"))
166 /// .add_field("email", json!("user@example.com"));
167 ///
168 /// let job_id = workers.enqueue(job).await?;
169 ///
170 /// // High priority job
171 /// let urgent = Job::new("process_payment")
172 /// .add_field("amount", json!(99.99))
173 /// .with_priority(JobPriority::Critical)
174 /// .with_timeout(30); // 30 seconds
175 ///
176 /// workers.enqueue(urgent).await?;
177 ///
178 /// // Scheduled job (runs in 1 hour)
179 /// let scheduled = Job::new("send_reminder")
180 /// .add_field("message", json!("Meeting in 1 hour"))
181 /// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
182 ///
183 /// workers.enqueue(scheduled).await?;
184 /// ```
185 pub async fn enqueue(&self, job: Job) -> Result<String> {
186 self.queue.enqueue(job).await
187 }
188
189 /// Get job status
190 ///
191 /// Retrieves the current status of a job by its ID.
192 ///
193 /// # Arguments
194 /// * `job_id` - The job ID returned from `enqueue()`
195 ///
196 /// # Returns
197 /// - `Some(JobStatus)` if job exists
198 /// - `None` if job not found
199 ///
200 /// # Examples
201 ///
202
203 /// let job_id = workers.enqueue(job).await?;
204 ///
205 /// // Check status later
206 /// if let Some(status) = workers.get_status(&job_id).await? {
207 /// match status {
208 /// JobStatus::Pending => println!("Waiting to run"),
209 /// JobStatus::Running => println!("Currently executing"),
210 /// JobStatus::Completed => println!("Done!"),
211 /// JobStatus::Failed { error, retries } => {
212 /// println!("Failed: {} (retries: {})", error, retries);
213 /// },
214 /// JobStatus::DeadLetter { error } => {
215 /// println!("Permanently failed: {}", error);
216 /// },
217 /// }
218 /// }
219 /// ```ignore
220 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
221 self.queue.get_status(job_id).await
222 }
223
224 /// Get queue statistics
225 ///
226 /// Returns counts of jobs in various states for monitoring.
227 ///
228 /// # Returns
229 /// `QueueStats` with pending, running, completed, failed, and dead letter counts
230 ///
231 /// # Examples
232 ///
233
234 /// let stats = workers.stats().await?;
235 ///
236 /// println!("Queue status:");
237 /// println!(" Pending: {}", stats.pending);
238 /// println!(" Running: {}", stats.running);
239 /// println!(" Completed: {}", stats.completed);
240 /// println!(" Failed: {}", stats.failed);
241 /// println!(" Dead letter: {}", stats.dead_letter);
242 ///
243 /// // Alert on high failure rate
244 /// let total = stats.completed + stats.failed;
245 /// if total > 0 {
246 /// let failure_rate = stats.failed as f64 / total as f64;
247 /// if failure_rate > 0.10 {
248 /// alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
249 /// }
250 /// }
251 /// ```
252 pub async fn stats(&self) -> Result<QueueStats> {
253 self.queue.stats().await
254 }
255}
256
257#[derive(Debug, Clone)]
258pub struct QueueStats {
259 pub pending: usize,
260 pub running: usize,
261 pub completed: usize,
262 pub failed: usize,
263 pub dead_letter: usize,
264}