Skip to main content

karbon_framework/job/
mod.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::mpsc;
6
7/// Trait for background jobs.
8///
9/// ```ignore
10/// struct SendEmailJob { pub to: String, pub subject: String }
11///
12/// impl Job for SendEmailJob {
13///     fn name(&self) -> &str { "send_email" }
14///
15///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
16///         Box::pin(async {
17///             // send email logic
18///             Ok(())
19///         })
20///     }
21/// }
22/// ```
23pub trait Job: Send + Sync + 'static {
24    fn name(&self) -> &str;
25    fn execute(&self) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>>;
26
27    /// Number of retries on failure (default: 0)
28    fn max_retries(&self) -> u32 { 0 }
29
30    /// Delay between retries (default: 5s)
31    fn retry_delay(&self) -> Duration { Duration::from_secs(5) }
32}
33
34/// In-process background job queue using tokio tasks.
35///
36/// ```ignore
37/// let queue = JobQueue::new(4); // 4 workers
38/// queue.push(SendEmailJob { to: "foo@bar.com".into(), subject: "Hello".into() }).await;
39/// // job runs in background, no need to await
40///
41/// queue.shutdown().await; // wait for all jobs to finish
42/// ```
43pub struct JobQueue {
44    sender: mpsc::Sender<Arc<dyn Job>>,
45    shutdown: Option<tokio::task::JoinHandle<()>>,
46}
47
48impl JobQueue {
49    /// Create a new job queue with the given number of concurrent workers
50    pub fn new(workers: usize) -> Self {
51        let (sender, receiver) = mpsc::channel::<Arc<dyn Job>>(256);
52        let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
53
54        let handle = tokio::spawn(async move {
55            let semaphore = Arc::new(tokio::sync::Semaphore::new(workers));
56            let recv = receiver;
57
58            loop {
59                let job = {
60                    let mut rx = recv.lock().await;
61                    rx.recv().await
62                };
63
64                let Some(job) = job else { break };
65                let sem = semaphore.clone();
66
67                tokio::spawn(async move {
68                    let Ok(_permit) = sem.acquire().await else { return };
69                    let name = job.name().to_string();
70                    let max_retries = job.max_retries();
71
72                    for attempt in 0..=max_retries {
73                        match job.execute().await {
74                            Ok(()) => {
75                                tracing::debug!(job = %name, "Job completed");
76                                return;
77                            }
78                            Err(e) => {
79                                if attempt < max_retries {
80                                    tracing::warn!(
81                                        job = %name,
82                                        attempt = attempt + 1,
83                                        max = max_retries,
84                                        error = %e,
85                                        "Job failed, retrying..."
86                                    );
87                                    tokio::time::sleep(job.retry_delay()).await;
88                                } else {
89                                    tracing::error!(
90                                        job = %name,
91                                        error = %e,
92                                        "Job failed after {} attempts",
93                                        max_retries + 1
94                                    );
95                                }
96                            }
97                        }
98                    }
99                });
100            }
101        });
102
103        Self {
104            sender,
105            shutdown: Some(handle),
106        }
107    }
108
109    /// Push a job to the queue for background execution
110    pub async fn push<J: Job>(&self, job: J) {
111        if self.sender.send(Arc::new(job)).await.is_err() {
112            tracing::error!("Job queue is closed");
113        }
114    }
115
116    /// Shut down the queue and wait for all pending jobs
117    pub async fn shutdown(mut self) {
118        drop(self.sender.clone());
119        if let Some(handle) = self.shutdown.take() {
120            let _ = handle.await;
121        }
122    }
123}