Skip to main content

aurora_db/workers/
mod.rs

1// Durable Workers - Background job processing with persistence
2//
3// Provides a reliable background job system with:
4// - Job persistence across restarts
5// - Retry logic with exponential backoff
6// - Job scheduling and delayed execution
7// - Dead letter queue for failed jobs
8// - Job status tracking
9
10pub mod executor;
11pub mod job;
12pub mod queue;
13
14pub use executor::{WorkerConfig, WorkerExecutor};
15pub use job::{Job, JobPriority, JobStatus};
16pub use queue::JobQueue;
17
18use crate::error::Result;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Main worker system for durable background job processing
23///
24/// Provides a reliable background job system with job persistence, retry logic,
25/// scheduling, and status tracking. Perfect for email sending, image processing,
26/// report generation, and any task that should survive restarts.
27///
28/// # Features
29/// - Job persistence across restarts
30/// - Exponential backoff retry logic
31/// - Job scheduling and delayed execution
32/// - Dead letter queue for failed jobs
33/// - Priority-based job execution
34/// - Concurrent job processing
35/// - Job status tracking
36///
37/// # Examples
38///
39/// ```ignore
40/// use aurora_db::workers::{WorkerSystem, WorkerConfig, Job, JobPriority};
41/// use serde_json::json;
42///
43/// // Create worker system
44/// let config = WorkerConfig {
45///     storage_path: "./workers.db".to_string(),
46///     concurrency: 10,
47///     poll_interval_ms: 100,
48///     cleanup_interval_seconds: 3600,
49/// };
50///
51/// let workers = WorkerSystem::new(config)?;
52///
53/// // Register job handlers
54/// workers.register_handler("send_email", |job| async move {
55///     let to = job.payload.get("to").and_then(|v| v.as_str()).unwrap();
56///     let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap();
57///
58///     // Send email
59///     send_email(to, subject).await?;
60///     Ok(())
61/// }).await;
62///
63/// // Start processing jobs
64/// workers.start().await?;
65///
66/// // Enqueue a job
67/// let job = Job::new("send_email")
68///     .add_field("to", json!("user@example.com"))
69///     .add_field("subject", json!("Welcome!"))
70///     .with_priority(JobPriority::High);
71///
72/// let job_id = workers.enqueue(job).await?;
73/// println!("Enqueued job: {}", job_id);
74/// ```
75pub struct WorkerSystem {
76    queue: Arc<JobQueue>,
77    executor: Arc<RwLock<WorkerExecutor>>,
78}
79
80impl WorkerSystem {
81    /// Create a new worker system
82    ///
83    /// # Arguments
84    /// * `config` - Worker configuration including storage path and concurrency settings
85    ///
86    /// # Examples
87    ///
88
89    /// let config = WorkerConfig {
90    ///     storage_path: "./jobs.db".to_string(),
91    ///     concurrency: 5,
92    ///     poll_interval_ms: 100,
93    ///     cleanup_interval_seconds: 3600,
94    /// };
95    ///
96    /// let workers = WorkerSystem::new(config)?;
97    /// ```ignore
98    pub fn new(config: WorkerConfig) -> Result<Self> {
99        let queue = Arc::new(JobQueue::new(config.storage_path.clone())?);
100        let executor = Arc::new(RwLock::new(WorkerExecutor::new(Arc::clone(&queue), config)));
101
102        Ok(Self { queue, executor })
103    }
104
105    /// Register a job handler
106    pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
107    where
108        F: Fn(Job) -> Fut + Send + Sync + 'static,
109        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
110    {
111        self.executor
112            .read()
113            .await
114            .register_handler(job_type, handler)
115            .await
116    }
117
118    /// Start the worker system
119    ///
120    /// Begins processing jobs from the queue. Jobs are executed concurrently
121    /// based on the `max_concurrent_jobs` configuration.
122    ///
123    /// # Examples
124    ///
125
126    /// let workers = WorkerSystem::new(config)?;
127    ///
128    /// // Register handlers first
129    /// workers.register_handler("task", handler).await;
130    ///
131    /// // Then start processing
132    /// workers.start().await?;
133    /// ```
134    pub async fn start(&self) -> Result<()> {
135        self.executor.write().await.start().await
136    }
137
138    /// Stop the worker system gracefully
139    ///
140    /// Waits for currently running jobs to complete before shutting down.
141    /// No new jobs will be picked up after calling this method.
142    ///
143    /// # Examples
144    ///
145
146    /// // Graceful shutdown
147    /// workers.stop().await?;
148    /// ```ignore
149    pub async fn stop(&self) -> Result<()> {
150        self.executor.write().await.stop().await
151    }
152
153    /// Enqueue a new job for processing
154    ///
155    /// Adds a job to the queue. It will be executed by a worker when available,
156    /// respecting priority and scheduling constraints.
157    ///
158    /// # Arguments
159    /// * `job` - The job to enqueue
160    ///
161    /// # Returns
162    /// The unique job ID for tracking status
163    ///
164    /// # Examples
165    ///
166
167    /// use serde_json::json;
168    ///
169    /// // Simple job
170    /// let job = Job::new("send_welcome_email")
171    ///     .add_field("user_id", json!("123"))
172    ///     .add_field("email", json!("user@example.com"));
173    ///
174    /// let job_id = workers.enqueue(job).await?;
175    ///
176    /// // High priority job
177    /// let urgent = Job::new("process_payment")
178    ///     .add_field("amount", json!(99.99))
179    ///     .with_priority(JobPriority::Critical)
180    ///     .with_timeout(30); // 30 seconds
181    ///
182    /// workers.enqueue(urgent).await?;
183    ///
184    /// // Scheduled job (runs in 1 hour)
185    /// let scheduled = Job::new("send_reminder")
186    ///     .add_field("message", json!("Meeting in 1 hour"))
187    ///     .scheduled_at(chrono::Utc::now() + chrono::Duration::hours(1));
188    ///
189    /// workers.enqueue(scheduled).await?;
190    /// ```
191    pub async fn enqueue(&self, job: Job) -> Result<String> {
192        self.queue.enqueue(job).await
193    }
194
195    /// Get job status
196    ///
197    /// Retrieves the current status of a job by its ID.
198    ///
199    /// # Arguments
200    /// * `job_id` - The job ID returned from `enqueue()`
201    ///
202    /// # Returns
203    /// - `Some(JobStatus)` if job exists
204    /// - `None` if job not found
205    ///
206    /// # Examples
207    ///
208
209    /// let job_id = workers.enqueue(job).await?;
210    ///
211    /// // Check status later
212    /// if let Some(status) = workers.get_status(&job_id).await? {
213    ///     match status {
214    ///         JobStatus::Pending => println!("Waiting to run"),
215    ///         JobStatus::Running => println!("Currently executing"),
216    ///         JobStatus::Completed => println!("Done!"),
217    ///         JobStatus::Failed { error, retries } => {
218    ///             println!("Failed: {} (retries: {})", error, retries);
219    ///         },
220    ///         JobStatus::DeadLetter { error } => {
221    ///             println!("Permanently failed: {}", error);
222    ///         },
223    ///     }
224    /// }
225    /// ```ignore
226    pub async fn get_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
227        self.queue.get_status(job_id).await
228    }
229
230    /// Get queue statistics
231    ///
232    /// Returns counts of jobs in various states for monitoring.
233    ///
234    /// # Returns
235    /// `QueueStats` with pending, running, completed, failed, and dead letter counts
236    ///
237    /// # Examples
238    ///
239
240    /// let stats = workers.stats().await?;
241    ///
242    /// println!("Queue status:");
243    /// println!("  Pending: {}", stats.pending);
244    /// println!("  Running: {}", stats.running);
245    /// println!("  Completed: {}", stats.completed);
246    /// println!("  Failed: {}", stats.failed);
247    /// println!("  Dead letter: {}", stats.dead_letter);
248    ///
249    /// // Alert on high failure rate
250    /// let total = stats.completed + stats.failed;
251    /// if total > 0 {
252    ///     let failure_rate = stats.failed as f64 / total as f64;
253    ///     if failure_rate > 0.10 {
254    ///         alert!("High job failure rate: {:.1}%", failure_rate * 100.0);
255    ///     }
256    /// }
257    /// ```
258    pub async fn stats(&self) -> Result<QueueStats> {
259        self.queue.stats().await
260    }
261}
262
263#[derive(Debug, Clone)]
264pub struct QueueStats {
265    pub pending: usize,
266    pub running: usize,
267    pub completed: usize,
268    pub failed: usize,
269    pub dead_letter: usize,
270}