aurora_db/workers/mod.rs
1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45/// storage_path: "./workers.db".to_string(),
46/// max_concurrent_jobs: 10,
47/// poll_interval_ms: 100,
48/// };
49///
50/// let workers = WorkerSystem::new(config)?;
51///
52/// // Register job handlers
53/// workers.register_handler("send_email", |job| async move {
54/// let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
55/// let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
56///
57/// // Send email
58/// send_email(to, subject).await?;
59/// Ok(())
60/// }).await;
61///
62/// // Start processing jobs
63/// workers.start().await?;
64///
65/// // Enqueue a job
66/// let job = Job::new("send_email")
67/// .add_field("to", json!("user@example.com"))
68/// .add_field("subject", json!("Welcome!"))
69/// .with_priority(JobPriority::High);
70///
71/// let job_id = workers.enqueue(job).await?;
72/// println!("Enqueued job: {}", job_id);
73/// ```
74pub struct WorkerSystem {
75 queue: Arc<JobQueue>,
76 executor: Arc<RwLock<WorkerExecutor>>,
77}
78
79impl WorkerSystem {
80 /// Create a new worker system
81 ///
82 /// # Arguments
83 /// * `config` - Worker configuration including storage path and concurrency settings
84 ///
85 /// # Examples
86 ///
87 /// ```
88 /// let config = WorkerConfig {
89 /// storage_path: "./jobs.db".to_string(),
90 /// max_concurrent_jobs: 5,
91 /// poll_interval_ms: 100,
92 /// };
93 ///
94 /// let workers = WorkerSystem::new(config)?;
95 /// ```
96 pub fn new(config: WorkerConfig) -> Result<Self> {
97 let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
98 let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
99
100 Ok(Self { queue, executor })
101 }
102
103 /// Start the worker system
104 ///
105 /// Begins processing jobs from the queue. Jobs are executed concurrently
106 /// based on the `max_concurrent_jobs` configuration.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// let workers = WorkerSystem::new(config)?;
112 ///
113 /// // Register handlers first
114 /// workers.register_handler("task", handler).await;
115 ///
116 /// // Then start processing
117 /// workers.start().await?;
118 /// ```
119 pub async fn start(&self) -> Result<()> {
120 self.executor.write().await.start().await
121 }
122
123 /// Stop the worker system gracefully
124 ///
125 /// Waits for currently running jobs to complete before shutting down.
126 /// No new jobs will be picked up after calling this method.
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// // Graceful shutdown
132 /// workers.stop().await?;
133 /// ```
134 pub async fn stop(&self) -> Result<()> {
135 self.executor.write().await.stop().await
136 }
137
138 /// Enqueue a new job for processing
139 ///
140 /// Adds a job to the queue. It will be executed by a worker when available,
141 /// respecting priority and scheduling constraints.
142 ///
143 /// # Arguments
144 /// * `job` - The job to enqueue
145 ///
146 /// # Returns
147 /// The unique job ID for tracking status
148 ///
149 /// # Examples
150 ///
151 /// ```
152 /// use serde_json::json;
153 ///
154 /// // Simple job
155 /// let job = Job::new("send_welcome_email")
156 /// .add_field("user_id", json!("123"))
157 /// .add_field("email", json!("user@example.com"));
158 ///
159 /// let job_id = workers.enqueue(job).await?;
160 ///
161 /// // High priority job
162 /// let urgent = Job::new("process_payment")
163 /// .add_field("amount", json!(99.99))
164 /// .with_priority(JobPriority::Critical)
165 /// .with_timeout(30); // 30 seconds
166 ///
167 /// workers.enqueue(urgent).await?;
168 ///
169 /// // Scheduled job (runs in 1 hour)
170 /// let scheduled = Job::new("send_reminder")
171 /// .add_field("message", json!("Meeting in 1 hour"))
172 /// .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
173 ///
174 /// workers.enqueue(scheduled).await?;
175 /// ```
176 pub async fn enqueue(&self, job: Job) -> Result<String> {
177 self.queue.enqueue(job).await
178 }
179
180 /// Get job status
181 ///
182 /// Retrieves the current status of a job by its ID.
183 ///
184 /// # Arguments
185 /// * `job_id` - The job ID returned from `enqueue()`
186 ///
187 /// # Returns
188 /// - `Some(JobStatus)` if job exists
189 /// - `None` if job not found
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// let job_id = workers.enqueue(job).await?;
195 ///
196 /// // Check status later
197 /// if let Some(status) = workers.get_status(&job_id).await? {
198 /// match status {
199 /// JobStatus::Pending => println!("Waiting to run"),
200 /// JobStatus::Running => println!("Currently executing"),
201 /// JobStatus::Completed => println!("Done!"),
202 /// JobStatus::Failed { error, retries } => {
203 /// println!("Failed: {} (retries: {})", error, retries);
204 /// },
205 /// JobStatus::DeadLetter { error } => {
206 /// println!("Permanently failed: {}", error);
207 /// },
208 /// }
209 /// }
210 /// ```
211 pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
212 self.queue.get_status(job_id).await
213 }
214
215 /// Get queue statistics
216 ///
217 /// Returns counts of jobs in various states for monitoring.
218 ///
219 /// # Returns
220 /// `QueueStats` with pending, running, completed, failed, and dead letter counts
221 ///
222 /// # Examples
223 ///
224 /// ```
225 /// let stats = workers.stats().await?;
226 ///
227 /// println!("Queue status:");
228 /// println!(" Pending: {}", stats.pending);
229 /// println!(" Running: {}", stats.running);
230 /// println!(" Completed: {}", stats.completed);
231 /// println!(" Failed: {}", stats.failed);
232 /// println!(" Dead letter: {}", stats.dead_letter);
233 ///
234 /// // Alert on high failure rate
235 /// let total = stats.completed + stats.failed;
236 /// if total > 0 {
237 /// let failure_rate = stats.failed as f64 / total as f64;
238 /// if failure_rate > 0.10 {
239 /// alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
240 /// }
241 /// }
242 /// ```
243 pub async fn stats(&self) -> Result<QueueStats> {
244 self.queue.stats().await
245 }
246}
247
248#[derive(Debug, Clone)]
249pub struct QueueStats {
250 pub pending: usize,
251 pub running: usize,
252 pub completed: usize,
253 pub failed: usize,
254 pub dead_letter: usize,
255}