1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13#[derive(Clone)]
15pub struct JobDispatcher {
16 queue: JobQueue,
17 registry: JobRegistry,
18}
19
20impl JobDispatcher {
21 pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23 Self { queue, registry }
24 }
25
26 pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28 where
29 J::Args: serde::Serialize,
30 {
31 let info = J::info();
32 self.dispatch_with_info(&info, serde_json::to_value(args)?, None)
33 .await
34 }
35
36 pub async fn dispatch_in<J: ForgeJob>(
38 &self,
39 delay: Duration,
40 args: J::Args,
41 ) -> Result<Uuid, forge_core::ForgeError>
42 where
43 J::Args: serde::Serialize,
44 {
45 let info = J::info();
46 let scheduled_at = Utc::now()
47 + chrono::Duration::from_std(delay)
48 .map_err(|_| forge_core::ForgeError::InvalidArgument("delay too large".into()))?;
49 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
50 .await
51 }
52
53 pub async fn dispatch_at<J: ForgeJob>(
55 &self,
56 at: DateTime<Utc>,
57 args: J::Args,
58 ) -> Result<Uuid, forge_core::ForgeError>
59 where
60 J::Args: serde::Serialize,
61 {
62 let info = J::info();
63 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
64 .await
65 }
66
67 pub async fn dispatch_by_name(
69 &self,
70 job_type: &str,
71 args: serde_json::Value,
72 owner_subject: Option<String>,
73 ) -> Result<Uuid, forge_core::ForgeError> {
74 let entry = self.registry.get(job_type).ok_or_else(|| {
75 forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
76 })?;
77
78 self.dispatch_with_info(&entry.info, args, owner_subject)
79 .await
80 }
81
82 async fn dispatch_with_info(
84 &self,
85 info: &JobInfo,
86 args: serde_json::Value,
87 owner_subject: Option<String>,
88 ) -> Result<Uuid, forge_core::ForgeError> {
89 let mut job = JobRecord::new(
90 info.name,
91 args,
92 info.priority,
93 info.retry.max_attempts as i32,
94 )
95 .with_owner_subject(owner_subject);
96
97 if let Some(cap) = info.worker_capability {
98 job = job.with_capability(cap);
99 }
100
101 self.queue
102 .enqueue(job)
103 .await
104 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
105 }
106
107 pub async fn cancel(
109 &self,
110 job_id: Uuid,
111 reason: Option<&str>,
112 ) -> Result<bool, forge_core::ForgeError> {
113 self.queue
114 .request_cancel(job_id, reason)
115 .await
116 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
117 }
118
119 async fn dispatch_at_with_info(
121 &self,
122 info: &JobInfo,
123 args: serde_json::Value,
124 scheduled_at: DateTime<Utc>,
125 ) -> Result<Uuid, forge_core::ForgeError> {
126 let mut job = JobRecord::new(
127 info.name,
128 args,
129 info.priority,
130 info.retry.max_attempts as i32,
131 )
132 .with_scheduled_at(scheduled_at);
133
134 if let Some(cap) = info.worker_capability {
135 job = job.with_capability(cap);
136 }
137
138 self.queue
139 .enqueue(job)
140 .await
141 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
142 }
143
144 pub async fn dispatch_idempotent<J: ForgeJob>(
146 &self,
147 idempotency_key: impl Into<String>,
148 args: J::Args,
149 ) -> Result<Uuid, forge_core::ForgeError>
150 where
151 J::Args: serde::Serialize,
152 {
153 let info = J::info();
154 let mut job = JobRecord::new(
155 info.name,
156 serde_json::to_value(args)?,
157 info.priority,
158 info.retry.max_attempts as i32,
159 )
160 .with_idempotency_key(idempotency_key);
161
162 if let Some(cap) = info.worker_capability {
163 job = job.with_capability(cap);
164 }
165
166 self.queue
167 .enqueue(job)
168 .await
169 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
170 }
171
172 pub async fn dispatch_with_priority<J: ForgeJob>(
174 &self,
175 priority: JobPriority,
176 args: J::Args,
177 ) -> Result<Uuid, forge_core::ForgeError>
178 where
179 J::Args: serde::Serialize,
180 {
181 let info = J::info();
182 let mut job = JobRecord::new(
183 info.name,
184 serde_json::to_value(args)?,
185 priority,
186 info.retry.max_attempts as i32,
187 );
188
189 if let Some(cap) = info.worker_capability {
190 job = job.with_capability(cap);
191 }
192
193 self.queue
194 .enqueue(job)
195 .await
196 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
197 }
198}
199
200impl JobDispatch for JobDispatcher {
201 fn get_info(&self, job_type: &str) -> Option<JobInfo> {
202 self.registry.get(job_type).map(|e| e.info.clone())
203 }
204
205 fn dispatch_by_name(
206 &self,
207 job_type: &str,
208 args: serde_json::Value,
209 owner_subject: Option<String>,
210 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
211 let job_type = job_type.to_string();
212 Box::pin(async move { self.dispatch_by_name(&job_type, args, owner_subject).await })
213 }
214
215 fn cancel(
216 &self,
217 job_id: Uuid,
218 reason: Option<String>,
219 ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
220 Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[tokio::test]
229 async fn test_dispatcher_creation() {
230 let pool = sqlx::postgres::PgPoolOptions::new()
231 .max_connections(1)
232 .connect_lazy("postgres://localhost/nonexistent")
233 .expect("Failed to create mock pool");
234 let queue = JobQueue::new(pool);
235 let registry = JobRegistry::new();
236 let _dispatcher = JobDispatcher::new(queue, registry);
237 }
238}