forge_runtime/jobs/
dispatcher.rs1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13#[derive(Clone)]
15pub struct JobDispatcher {
16 queue: JobQueue,
17 registry: JobRegistry,
18}
19
20impl JobDispatcher {
21 pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23 Self { queue, registry }
24 }
25
26 pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28 where
29 J::Args: serde::Serialize,
30 {
31 let info = J::info();
32 self.dispatch_with_info(&info, serde_json::to_value(args)?)
33 .await
34 }
35
36 pub async fn dispatch_in<J: ForgeJob>(
38 &self,
39 delay: Duration,
40 args: J::Args,
41 ) -> Result<Uuid, forge_core::ForgeError>
42 where
43 J::Args: serde::Serialize,
44 {
45 let info = J::info();
46 let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48 .await
49 }
50
51 pub async fn dispatch_at<J: ForgeJob>(
53 &self,
54 at: DateTime<Utc>,
55 args: J::Args,
56 ) -> Result<Uuid, forge_core::ForgeError>
57 where
58 J::Args: serde::Serialize,
59 {
60 let info = J::info();
61 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62 .await
63 }
64
65 pub async fn dispatch_by_name(
67 &self,
68 job_type: &str,
69 args: serde_json::Value,
70 ) -> Result<Uuid, forge_core::ForgeError> {
71 let entry = self.registry.get(job_type).ok_or_else(|| {
72 forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
73 })?;
74
75 self.dispatch_with_info(&entry.info, args).await
76 }
77
78 async fn dispatch_with_info(
80 &self,
81 info: &JobInfo,
82 args: serde_json::Value,
83 ) -> Result<Uuid, forge_core::ForgeError> {
84 let mut job = JobRecord::new(
85 info.name,
86 args,
87 info.priority,
88 info.retry.max_attempts as i32,
89 );
90
91 if let Some(cap) = info.worker_capability {
92 job = job.with_capability(cap);
93 }
94
95 self.queue
96 .enqueue(job)
97 .await
98 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
99 }
100
101 pub async fn cancel(
103 &self,
104 job_id: Uuid,
105 reason: Option<&str>,
106 ) -> Result<bool, forge_core::ForgeError> {
107 self.queue
108 .request_cancel(job_id, reason)
109 .await
110 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
111 }
112
113 async fn dispatch_at_with_info(
115 &self,
116 info: &JobInfo,
117 args: serde_json::Value,
118 scheduled_at: DateTime<Utc>,
119 ) -> Result<Uuid, forge_core::ForgeError> {
120 let mut job = JobRecord::new(
121 info.name,
122 args,
123 info.priority,
124 info.retry.max_attempts as i32,
125 )
126 .with_scheduled_at(scheduled_at);
127
128 if let Some(cap) = info.worker_capability {
129 job = job.with_capability(cap);
130 }
131
132 self.queue
133 .enqueue(job)
134 .await
135 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
136 }
137
138 pub async fn dispatch_idempotent<J: ForgeJob>(
140 &self,
141 idempotency_key: impl Into<String>,
142 args: J::Args,
143 ) -> Result<Uuid, forge_core::ForgeError>
144 where
145 J::Args: serde::Serialize,
146 {
147 let info = J::info();
148 let mut job = JobRecord::new(
149 info.name,
150 serde_json::to_value(args)?,
151 info.priority,
152 info.retry.max_attempts as i32,
153 )
154 .with_idempotency_key(idempotency_key);
155
156 if let Some(cap) = info.worker_capability {
157 job = job.with_capability(cap);
158 }
159
160 self.queue
161 .enqueue(job)
162 .await
163 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
164 }
165
166 pub async fn dispatch_with_priority<J: ForgeJob>(
168 &self,
169 priority: JobPriority,
170 args: J::Args,
171 ) -> Result<Uuid, forge_core::ForgeError>
172 where
173 J::Args: serde::Serialize,
174 {
175 let info = J::info();
176 let mut job = JobRecord::new(
177 info.name,
178 serde_json::to_value(args)?,
179 priority,
180 info.retry.max_attempts as i32,
181 );
182
183 if let Some(cap) = info.worker_capability {
184 job = job.with_capability(cap);
185 }
186
187 self.queue
188 .enqueue(job)
189 .await
190 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
191 }
192}
193
194impl JobDispatch for JobDispatcher {
195 fn get_info(&self, job_type: &str) -> Option<JobInfo> {
196 self.registry.get(job_type).map(|e| e.info.clone())
197 }
198
199 fn dispatch_by_name(
200 &self,
201 job_type: &str,
202 args: serde_json::Value,
203 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
204 let job_type = job_type.to_string();
205 Box::pin(async move { self.dispatch_by_name(&job_type, args).await })
206 }
207
208 fn cancel(
209 &self,
210 job_id: Uuid,
211 reason: Option<String>,
212 ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
213 Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[tokio::test]
222 async fn test_dispatcher_creation() {
223 let pool = sqlx::postgres::PgPoolOptions::new()
224 .max_connections(1)
225 .connect_lazy("postgres://localhost/nonexistent")
226 .expect("Failed to create mock pool");
227 let queue = JobQueue::new(pool);
228 let registry = JobRegistry::new();
229 let _dispatcher = JobDispatcher::new(queue, registry);
230 }
231}