Skip to main content

aurora_db/workers/
mod.rs

1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45///     storage_path: "./workers.db".to_string(),
46///     concurrency: 10,
47///     poll_interval_ms: 100,
48///     cleanup_interval_seconds: 3600,
49/// };
50///
51/// let workers = WorkerSystem::new(config)?;
52///
53/// // Register job handlers
54/// workers.register_handler("send_email", |job| async move {
55///     let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
56///     let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
57///
58///     // Send email
59///     send_email(to, subject).await?;
60///     Ok(())
61/// }).await;
62///
63/// // Start processing jobs
64/// workers.start().await?;
65///
66/// // Enqueue a job
67/// let job = Job::new("send_email")
68///     .add_field("to", json!("user@example.com"))
69///     .add_field("subject", json!("Welcome!"))
70///     .with_priority(JobPriority::High);
71///
72/// let job_id = workers.enqueue(job).await?;
73/// println!("Enqueued job: {}", job_id);
74/// ```
75pub struct WorkerSystem {
76    queue: Arc<JobQueue>,
77    executor: Arc<RwLock<WorkerExecutor>>,
78}
79
80impl WorkerSystem {
81    /// Create a new worker system
82    ///
83    /// # Arguments
84    /// * `config` - Worker configuration including storage path and concurrency settings
85    ///
86    /// # Examples
87    ///
88
89    /// let config = WorkerConfig {
90    ///     storage_path: "./jobs.db".to_string(),
91    ///     concurrency: 5,
92    ///     poll_interval_ms: 100,
93    ///     cleanup_interval_seconds: 3600,
94    /// };
95    ///
96    /// let workers = WorkerSystem::new(config)?;
97    /// ```ignore
98    pub fn new(config: WorkerConfig) -> Result<Self> {
99        let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
100        let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
101
102        Ok(Self { queue, executor })
103    }
104
105    /// Register a job handler
106    pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
107    where
108        F: Fn(Job) -> Fut + Send + Sync + 'static,
109        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
110    {
111        self.executor.read().await.register_handler(job_type, handler).await
112    }
113
114    /// Start the worker system
115    ///
116    /// Begins processing jobs from the queue. Jobs are executed concurrently
117    /// based on the `max_concurrent_jobs` configuration.
118    ///
119    /// # Examples
120    ///
121
122    /// let workers = WorkerSystem::new(config)?;
123    ///
124    /// // Register handlers first
125    /// workers.register_handler("task", handler).await;
126    ///
127    /// // Then start processing
128    /// workers.start().await?;
129    /// ```
130    pub async fn start(&self) -> Result<()> {
131        self.executor.write().await.start().await
132    }
133
134    /// Stop the worker system gracefully
135    ///
136    /// Waits for currently running jobs to complete before shutting down.
137    /// No new jobs will be picked up after calling this method.
138    ///
139    /// # Examples
140    ///
141
142    /// // Graceful shutdown
143    /// workers.stop().await?;
144    /// ```ignore
145    pub async fn stop(&self) -> Result<()> {
146        self.executor.write().await.stop().await
147    }
148
149    /// Enqueue a new job for processing
150    ///
151    /// Adds a job to the queue. It will be executed by a worker when available,
152    /// respecting priority and scheduling constraints.
153    ///
154    /// # Arguments
155    /// * `job` - The job to enqueue
156    ///
157    /// # Returns
158    /// The unique job ID for tracking status
159    ///
160    /// # Examples
161    ///
162
163    /// use serde_json::json;
164    ///
165    /// // Simple job
166    /// let job = Job::new("send_welcome_email")
167    ///     .add_field("user_id", json!("123"))
168    ///     .add_field("email", json!("user@example.com"));
169    ///
170    /// let job_id = workers.enqueue(job).await?;
171    ///
172    /// // High priority job
173    /// let urgent = Job::new("process_payment")
174    ///     .add_field("amount", json!(99.99))
175    ///     .with_priority(JobPriority::Critical)
176    ///     .with_timeout(30); // 30 seconds
177    ///
178    /// workers.enqueue(urgent).await?;
179    ///
180    /// // Scheduled job (runs in 1 hour)
181    /// let scheduled = Job::new("send_reminder")
182    ///     .add_field("message", json!("Meeting in 1 hour"))
183    ///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
184    ///
185    /// workers.enqueue(scheduled).await?;
186    /// ```
187    pub async fn enqueue(&self, job: Job) -> Result<String> {
188        self.queue.enqueue(job).await
189    }
190
191    /// Get job status
192    ///
193    /// Retrieves the current status of a job by its ID.
194    ///
195    /// # Arguments
196    /// * `job_id` - The job ID returned from `enqueue()`
197    ///
198    /// # Returns
199    /// - `Some(JobStatus)` if job exists
200    /// - `None` if job not found
201    ///
202    /// # Examples
203    ///
204
205    /// let job_id = workers.enqueue(job).await?;
206    ///
207    /// // Check status later
208    /// if let Some(status) = workers.get_status(&job_id).await? {
209    ///     match status {
210    ///         JobStatus::Pending => println!("Waiting to run"),
211    ///         JobStatus::Running => println!("Currently executing"),
212    ///         JobStatus::Completed => println!("Done!"),
213    ///         JobStatus::Failed { error, retries } => {
214    ///             println!("Failed: {} (retries: {})", error, retries);
215    ///         },
216    ///         JobStatus::DeadLetter { error } => {
217    ///             println!("Permanently failed: {}", error);
218    ///         },
219    ///     }
220    /// }
221    /// ```ignore
222    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
223        self.queue.get_status(job_id).await
224    }
225
226    /// Get queue statistics
227    ///
228    /// Returns counts of jobs in various states for monitoring.
229    ///
230    /// # Returns
231    /// `QueueStats` with pending, running, completed, failed, and dead letter counts
232    ///
233    /// # Examples
234    ///
235
236    /// let stats = workers.stats().await?;
237    ///
238    /// println!("Queue status:");
239    /// println!("  Pending: {}", stats.pending);
240    /// println!("  Running: {}", stats.running);
241    /// println!("  Completed: {}", stats.completed);
242    /// println!("  Failed: {}", stats.failed);
243    /// println!("  Dead letter: {}", stats.dead_letter);
244    ///
245    /// // Alert on high failure rate
246    /// let total = stats.completed + stats.failed;
247    /// if total > 0 {
248    ///     let failure_rate = stats.failed as f64 / total as f64;
249    ///     if failure_rate > 0.10 {
250    ///         alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
251    ///     }
252    /// }
253    /// ```
254    pub async fn stats(&self) -> Result<QueueStats> {
255        self.queue.stats().await
256    }
257}
258
259#[derive(Debug, Clone)]
260pub struct QueueStats {
261    pub pending: usize,
262    pub running: usize,
263    pub completed: usize,
264    pub failed: usize,
265    pub dead_letter: usize,
266}