Skip to main content

aurora_db/workers/
mod.rs

1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45///     storage_path: "./workers.db".to_string(),
46///     max_concurrent_jobs: 10,
47///     poll_interval_ms: 100,
48/// };
49///
50/// let workers = WorkerSystem::new(config)?;
51///
52/// // Register job handlers
53/// workers.register_handler("send_email", |job| async move {
54///     let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
55///     let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
56///
57///     // Send email
58///     send_email(to, subject).await?;
59///     Ok(())
60/// }).await;
61///
62/// // Start processing jobs
63/// workers.start().await?;
64///
65/// // Enqueue a job
66/// let job = Job::new("send_email")
67///     .add_field("to", json!("user@example.com"))
68///     .add_field("subject", json!("Welcome!"))
69///     .with_priority(JobPriority::High);
70///
71/// let job_id = workers.enqueue(job).await?;
72/// println!("Enqueued job: {}", job_id);
73/// ```
74pub struct WorkerSystem {
75    queue: Arc<JobQueue>,
76    executor: Arc<RwLock<WorkerExecutor>>,
77}
78
79impl WorkerSystem {
80    /// Create a new worker system
81    ///
82    /// # Arguments
83    /// * `config` - Worker configuration including storage path and concurrency settings
84    ///
85    /// # Examples
86    ///
87
88    /// let config = WorkerConfig {
89    ///     storage_path: "./jobs.db".to_string(),
90    ///     max_concurrent_jobs: 5,
91    ///     poll_interval_ms: 100,
92    /// };
93    ///
94    /// let workers = WorkerSystem::new(config)?;
95    /// ```ignore
96    pub fn new(config: WorkerConfig) -> Result<Self> {
97        let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
98        let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
99
100        Ok(Self { queue, executor })
101    }
102
103    /// Register a job handler
104    pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
105    where
106        F: Fn(Job) -> Fut + Send + Sync + 'static,
107        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
108    {
109        self.executor.read().await.register_handler(job_type, handler).await
110    }
111
112    /// Start the worker system
113    ///
114    /// Begins processing jobs from the queue. Jobs are executed concurrently
115    /// based on the `max_concurrent_jobs` configuration.
116    ///
117    /// # Examples
118    ///
119
120    /// let workers = WorkerSystem::new(config)?;
121    ///
122    /// // Register handlers first
123    /// workers.register_handler("task", handler).await;
124    ///
125    /// // Then start processing
126    /// workers.start().await?;
127    /// ```
128    pub async fn start(&self) -> Result<()> {
129        self.executor.write().await.start().await
130    }
131
132    /// Stop the worker system gracefully
133    ///
134    /// Waits for currently running jobs to complete before shutting down.
135    /// No new jobs will be picked up after calling this method.
136    ///
137    /// # Examples
138    ///
139
140    /// // Graceful shutdown
141    /// workers.stop().await?;
142    /// ```ignore
143    pub async fn stop(&self) -> Result<()> {
144        self.executor.write().await.stop().await
145    }
146
147    /// Enqueue a new job for processing
148    ///
149    /// Adds a job to the queue. It will be executed by a worker when available,
150    /// respecting priority and scheduling constraints.
151    ///
152    /// # Arguments
153    /// * `job` - The job to enqueue
154    ///
155    /// # Returns
156    /// The unique job ID for tracking status
157    ///
158    /// # Examples
159    ///
160
161    /// use serde_json::json;
162    ///
163    /// // Simple job
164    /// let job = Job::new("send_welcome_email")
165    ///     .add_field("user_id", json!("123"))
166    ///     .add_field("email", json!("user@example.com"));
167    ///
168    /// let job_id = workers.enqueue(job).await?;
169    ///
170    /// // High priority job
171    /// let urgent = Job::new("process_payment")
172    ///     .add_field("amount", json!(99.99))
173    ///     .with_priority(JobPriority::Critical)
174    ///     .with_timeout(30); // 30 seconds
175    ///
176    /// workers.enqueue(urgent).await?;
177    ///
178    /// // Scheduled job (runs in 1 hour)
179    /// let scheduled = Job::new("send_reminder")
180    ///     .add_field("message", json!("Meeting in 1 hour"))
181    ///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
182    ///
183    /// workers.enqueue(scheduled).await?;
184    /// ```
185    pub async fn enqueue(&self, job: Job) -> Result<String> {
186        self.queue.enqueue(job).await
187    }
188
189    /// Get job status
190    ///
191    /// Retrieves the current status of a job by its ID.
192    ///
193    /// # Arguments
194    /// * `job_id` - The job ID returned from `enqueue()`
195    ///
196    /// # Returns
197    /// - `Some(JobStatus)` if job exists
198    /// - `None` if job not found
199    ///
200    /// # Examples
201    ///
202
203    /// let job_id = workers.enqueue(job).await?;
204    ///
205    /// // Check status later
206    /// if let Some(status) = workers.get_status(&job_id).await? {
207    ///     match status {
208    ///         JobStatus::Pending => println!("Waiting to run"),
209    ///         JobStatus::Running => println!("Currently executing"),
210    ///         JobStatus::Completed => println!("Done!"),
211    ///         JobStatus::Failed { error, retries } => {
212    ///             println!("Failed: {} (retries: {})", error, retries);
213    ///         },
214    ///         JobStatus::DeadLetter { error } => {
215    ///             println!("Permanently failed: {}", error);
216    ///         },
217    ///     }
218    /// }
219    /// ```ignore
220    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
221        self.queue.get_status(job_id).await
222    }
223
224    /// Get queue statistics
225    ///
226    /// Returns counts of jobs in various states for monitoring.
227    ///
228    /// # Returns
229    /// `QueueStats` with pending, running, completed, failed, and dead letter counts
230    ///
231    /// # Examples
232    ///
233
234    /// let stats = workers.stats().await?;
235    ///
236    /// println!("Queue status:");
237    /// println!("  Pending: {}", stats.pending);
238    /// println!("  Running: {}", stats.running);
239    /// println!("  Completed: {}", stats.completed);
240    /// println!("  Failed: {}", stats.failed);
241    /// println!("  Dead letter: {}", stats.dead_letter);
242    ///
243    /// // Alert on high failure rate
244    /// let total = stats.completed + stats.failed;
245    /// if total > 0 {
246    ///     let failure_rate = stats.failed as f64 / total as f64;
247    ///     if failure_rate > 0.10 {
248    ///         alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
249    ///     }
250    /// }
251    /// ```
252    pub async fn stats(&self) -> Result<QueueStats> {
253        self.queue.stats().await
254    }
255}
256
257#[derive(Debug, Clone)]
258pub struct QueueStats {
259    pub pending: usize,
260    pub running: usize,
261    pub completed: usize,
262    pub failed: usize,
263    pub dead_letter: usize,
264}