karbon_framework/job/mod.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::mpsc;
6
7/// Trait for background jobs.
8///
9/// ```ignore
10/// struct SendEmailJob { pub to: String, pub subject: String }
11///
12/// impl Job for SendEmailJob {
13/// fn name(&self) -> &str { "send_email" }
14///
15/// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>> {
16/// Box::pin(async {
17/// // send email logic
18/// Ok(())
19/// })
20/// }
21/// }
22/// ```
23pub trait Job: Send + Sync + 'static {
24 fn name(&self) -> &str;
25 fn execute(&self) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + '_>>;
26
27 /// Number of retries on failure (default: 0)
28 fn max_retries(&self) -> u32 { 0 }
29
30 /// Delay between retries (default: 5s)
31 fn retry_delay(&self) -> Duration { Duration::from_secs(5) }
32}
33
34/// In-process background job queue using tokio tasks.
35///
36/// ```ignore
37/// let queue = JobQueue::new(4); // 4 workers
38/// queue.push(SendEmailJob { to: "foo@bar.com".into(), subject: "Hello".into() }).await;
39/// // job runs in background, no need to await
40///
41/// queue.shutdown().await; // wait for all jobs to finish
42/// ```
43pub struct JobQueue {
44 sender: mpsc::Sender<Arc<dyn Job>>,
45 shutdown: Option<tokio::task::JoinHandle<()>>,
46}
47
48impl JobQueue {
49 /// Create a new job queue with the given number of concurrent workers
50 pub fn new(workers: usize) -> Self {
51 let (sender, receiver) = mpsc::channel::<Arc<dyn Job>>(256);
52 let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
53
54 let handle = tokio::spawn(async move {
55 let semaphore = Arc::new(tokio::sync::Semaphore::new(workers));
56 let recv = receiver;
57
58 loop {
59 let job = {
60 let mut rx = recv.lock().await;
61 rx.recv().await
62 };
63
64 let Some(job) = job else { break };
65 let sem = semaphore.clone();
66
67 tokio::spawn(async move {
68 let Ok(_permit) = sem.acquire().await else { return };
69 let name = job.name().to_string();
70 let max_retries = job.max_retries();
71
72 for attempt in 0..=max_retries {
73 match job.execute().await {
74 Ok(()) => {
75 tracing::debug!(job = %name, "Job completed");
76 return;
77 }
78 Err(e) => {
79 if attempt < max_retries {
80 tracing::warn!(
81 job = %name,
82 attempt = attempt + 1,
83 max = max_retries,
84 error = %e,
85 "Job failed, retrying..."
86 );
87 tokio::time::sleep(job.retry_delay()).await;
88 } else {
89 tracing::error!(
90 job = %name,
91 error = %e,
92 "Job failed after {} attempts",
93 max_retries + 1
94 );
95 }
96 }
97 }
98 }
99 });
100 }
101 });
102
103 Self {
104 sender,
105 shutdown: Some(handle),
106 }
107 }
108
109 /// Push a job to the queue for background execution
110 pub async fn push<J: Job>(&self, job: J) {
111 if self.sender.send(Arc::new(job)).await.is_err() {
112 tracing::error!("Job queue is closed");
113 }
114 }
115
116 /// Shut down the queue and wait for all pending jobs
117 pub async fn shutdown(mut self) {
118 drop(self.sender.clone());
119 if let Some(handle) = self.shutdown.take() {
120 let _ = handle.await;
121 }
122 }
123}