Skip to main content

aurora_db/workers/
mod.rs

1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```rust,no_run
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45///     storage_path: "./workers.db".to_string(),
46///     max_concurrent_jobs: 10,
47///     poll_interval_ms: 100,
48/// };
49///
50/// let workers = WorkerSystem::new(config)?;
51///
52/// // Register job handlers
53/// workers.register_handler("send_email", |job| async move {
54///     let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
55///     let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
56///
57///     // Send email
58///     send_email(to, subject).await?;
59///     Ok(())
60/// }).await;
61///
62/// // Start processing jobs
63/// workers.start().await?;
64///
65/// // Enqueue a job
66/// let job = Job::new("send_email")
67///     .add_field("to", json!("user@example.com"))
68///     .add_field("subject", json!("Welcome!"))
69///     .with_priority(JobPriority::High);
70///
71/// let job_id = workers.enqueue(job).await?;
72/// println!("Enqueued job: {}", job_id);
73/// ```
74pub struct WorkerSystem {
75    queue: Arc<JobQueue>,
76    executor: Arc<RwLock<WorkerExecutor>>,
77}
78
79impl WorkerSystem {
80    /// Create a new worker system
81    ///
82    /// # Arguments
83    /// * `config` - Worker configuration including storage path and concurrency settings
84    ///
85    /// # Examples
86    ///
87   
88    /// let config = WorkerConfig {
89    ///     storage_path: "./jobs.db".to_string(),
90    ///     max_concurrent_jobs: 5,
91    ///     poll_interval_ms: 100,
92    /// };
93    ///
94    /// let workers = WorkerSystem::new(config)?;
95    /// ```
96    pub fn new(config: WorkerConfig) -> Result<Self> {
97        let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
98        let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
99
100        Ok(Self { queue, executor })
101    }
102
103    /// Start the worker system
104    ///
105    /// Begins processing jobs from the queue. Jobs are executed concurrently
106    /// based on the `max_concurrent_jobs` configuration.
107    ///
108    /// # Examples
109    ///
110   
111    /// let workers = WorkerSystem::new(config)?;
112    ///
113    /// // Register handlers first
114    /// workers.register_handler("task", handler).await;
115    ///
116    /// // Then start processing
117    /// workers.start().await?;
118    /// ```
119    pub async fn start(&self) -> Result<()> {
120        self.executor.write().await.start().await
121    }
122
123    /// Stop the worker system gracefully
124    ///
125    /// Waits for currently running jobs to complete before shutting down.
126    /// No new jobs will be picked up after calling this method.
127    ///
128    /// # Examples
129    ///
130   
131    /// // Graceful shutdown
132    /// workers.stop().await?;
133    /// ```
134    pub async fn stop(&self) -> Result<()> {
135        self.executor.write().await.stop().await
136    }
137
138    /// Enqueue a new job for processing
139    ///
140    /// Adds a job to the queue. It will be executed by a worker when available,
141    /// respecting priority and scheduling constraints.
142    ///
143    /// # Arguments
144    /// * `job` - The job to enqueue
145    ///
146    /// # Returns
147    /// The unique job ID for tracking status
148    ///
149    /// # Examples
150    ///
151   
152    /// use serde_json::json;
153    ///
154    /// // Simple job
155    /// let job = Job::new("send_welcome_email")
156    ///     .add_field("user_id", json!("123"))
157    ///     .add_field("email", json!("user@example.com"));
158    ///
159    /// let job_id = workers.enqueue(job).await?;
160    ///
161    /// // High priority job
162    /// let urgent = Job::new("process_payment")
163    ///     .add_field("amount", json!(99.99))
164    ///     .with_priority(JobPriority::Critical)
165    ///     .with_timeout(30); // 30 seconds
166    ///
167    /// workers.enqueue(urgent).await?;
168    ///
169    /// // Scheduled job (runs in 1 hour)
170    /// let scheduled = Job::new("send_reminder")
171    ///     .add_field("message", json!("Meeting in 1 hour"))
172    ///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
173    ///
174    /// workers.enqueue(scheduled).await?;
175    /// ```
176    pub async fn enqueue(&self, job: Job) -> Result<String> {
177        self.queue.enqueue(job).await
178    }
179
180    /// Get job status
181    ///
182    /// Retrieves the current status of a job by its ID.
183    ///
184    /// # Arguments
185    /// * `job_id` - The job ID returned from `enqueue()`
186    ///
187    /// # Returns
188    /// - `Some(JobStatus)` if job exists
189    /// - `None` if job not found
190    ///
191    /// # Examples
192    ///
193   
194    /// let job_id = workers.enqueue(job).await?;
195    ///
196    /// // Check status later
197    /// if let Some(status) = workers.get_status(&job_id).await? {
198    ///     match status {
199    ///         JobStatus::Pending => println!("Waiting to run"),
200    ///         JobStatus::Running => println!("Currently executing"),
201    ///         JobStatus::Completed => println!("Done!"),
202    ///         JobStatus::Failed { error, retries } => {
203    ///             println!("Failed: {} (retries: {})", error, retries);
204    ///         },
205    ///         JobStatus::DeadLetter { error } => {
206    ///             println!("Permanently failed: {}", error);
207    ///         },
208    ///     }
209    /// }
210    /// ```
211    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
212        self.queue.get_status(job_id).await
213    }
214
215    /// Get queue statistics
216    ///
217    /// Returns counts of jobs in various states for monitoring.
218    ///
219    /// # Returns
220    /// `QueueStats` with pending, running, completed, failed, and dead letter counts
221    ///
222    /// # Examples
223    ///
224   
225    /// let stats = workers.stats().await?;
226    ///
227    /// println!("Queue status:");
228    /// println!("  Pending: {}", stats.pending);
229    /// println!("  Running: {}", stats.running);
230    /// println!("  Completed: {}", stats.completed);
231    /// println!("  Failed: {}", stats.failed);
232    /// println!("  Dead letter: {}", stats.dead_letter);
233    ///
234    /// // Alert on high failure rate
235    /// let total = stats.completed + stats.failed;
236    /// if total > 0 {
237    ///     let failure_rate = stats.failed as f64 / total as f64;
238    ///     if failure_rate > 0.10 {
239    ///         alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
240    ///     }
241    /// }
242    /// ```
243    pub async fn stats(&self) -> Result<QueueStats> {
244        self.queue.stats().await
245    }
246}
247
248#[derive(Debug, Clone)]
249pub struct QueueStats {
250    pub pending: usize,
251    pub running: usize,
252    pub completed: usize,
253    pub failed: usize,
254    pub dead_letter: usize,
255}