1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13#[derive(Clone)]
15pub struct JobDispatcher {
16 queue: JobQueue,
17 registry: JobRegistry,
18}
19
20impl JobDispatcher {
21 pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23 Self { queue, registry }
24 }
25
26 pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28 where
29 J::Args: serde::Serialize,
30 {
31 let info = J::info();
32 self.dispatch_with_info(&info, serde_json::to_value(args)?, None)
33 .await
34 }
35
36 pub async fn dispatch_in<J: ForgeJob>(
38 &self,
39 delay: Duration,
40 args: J::Args,
41 ) -> Result<Uuid, forge_core::ForgeError>
42 where
43 J::Args: serde::Serialize,
44 {
45 let info = J::info();
46 let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48 .await
49 }
50
51 pub async fn dispatch_at<J: ForgeJob>(
53 &self,
54 at: DateTime<Utc>,
55 args: J::Args,
56 ) -> Result<Uuid, forge_core::ForgeError>
57 where
58 J::Args: serde::Serialize,
59 {
60 let info = J::info();
61 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62 .await
63 }
64
65 pub async fn dispatch_by_name(
67 &self,
68 job_type: &str,
69 args: serde_json::Value,
70 owner_subject: Option<String>,
71 ) -> Result<Uuid, forge_core::ForgeError> {
72 let entry = self.registry.get(job_type).ok_or_else(|| {
73 forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
74 })?;
75
76 self.dispatch_with_info(&entry.info, args, owner_subject)
77 .await
78 }
79
80 async fn dispatch_with_info(
82 &self,
83 info: &JobInfo,
84 args: serde_json::Value,
85 owner_subject: Option<String>,
86 ) -> Result<Uuid, forge_core::ForgeError> {
87 let mut job = JobRecord::new(
88 info.name,
89 args,
90 info.priority,
91 info.retry.max_attempts as i32,
92 )
93 .with_owner_subject(owner_subject);
94
95 if let Some(cap) = info.worker_capability {
96 job = job.with_capability(cap);
97 }
98
99 self.queue
100 .enqueue(job)
101 .await
102 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
103 }
104
105 pub async fn cancel(
107 &self,
108 job_id: Uuid,
109 reason: Option<&str>,
110 ) -> Result<bool, forge_core::ForgeError> {
111 self.queue
112 .request_cancel(job_id, reason)
113 .await
114 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
115 }
116
117 async fn dispatch_at_with_info(
119 &self,
120 info: &JobInfo,
121 args: serde_json::Value,
122 scheduled_at: DateTime<Utc>,
123 ) -> Result<Uuid, forge_core::ForgeError> {
124 let mut job = JobRecord::new(
125 info.name,
126 args,
127 info.priority,
128 info.retry.max_attempts as i32,
129 )
130 .with_scheduled_at(scheduled_at);
131
132 if let Some(cap) = info.worker_capability {
133 job = job.with_capability(cap);
134 }
135
136 self.queue
137 .enqueue(job)
138 .await
139 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
140 }
141
142 pub async fn dispatch_idempotent<J: ForgeJob>(
144 &self,
145 idempotency_key: impl Into<String>,
146 args: J::Args,
147 ) -> Result<Uuid, forge_core::ForgeError>
148 where
149 J::Args: serde::Serialize,
150 {
151 let info = J::info();
152 let mut job = JobRecord::new(
153 info.name,
154 serde_json::to_value(args)?,
155 info.priority,
156 info.retry.max_attempts as i32,
157 )
158 .with_idempotency_key(idempotency_key);
159
160 if let Some(cap) = info.worker_capability {
161 job = job.with_capability(cap);
162 }
163
164 self.queue
165 .enqueue(job)
166 .await
167 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
168 }
169
170 pub async fn dispatch_with_priority<J: ForgeJob>(
172 &self,
173 priority: JobPriority,
174 args: J::Args,
175 ) -> Result<Uuid, forge_core::ForgeError>
176 where
177 J::Args: serde::Serialize,
178 {
179 let info = J::info();
180 let mut job = JobRecord::new(
181 info.name,
182 serde_json::to_value(args)?,
183 priority,
184 info.retry.max_attempts as i32,
185 );
186
187 if let Some(cap) = info.worker_capability {
188 job = job.with_capability(cap);
189 }
190
191 self.queue
192 .enqueue(job)
193 .await
194 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
195 }
196}
197
198impl JobDispatch for JobDispatcher {
199 fn get_info(&self, job_type: &str) -> Option<JobInfo> {
200 self.registry.get(job_type).map(|e| e.info.clone())
201 }
202
203 fn dispatch_by_name(
204 &self,
205 job_type: &str,
206 args: serde_json::Value,
207 owner_subject: Option<String>,
208 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
209 let job_type = job_type.to_string();
210 Box::pin(async move { self.dispatch_by_name(&job_type, args, owner_subject).await })
211 }
212
213 fn cancel(
214 &self,
215 job_id: Uuid,
216 reason: Option<String>,
217 ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
218 Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[tokio::test]
227 async fn test_dispatcher_creation() {
228 let pool = sqlx::postgres::PgPoolOptions::new()
229 .max_connections(1)
230 .connect_lazy("postgres://localhost/nonexistent")
231 .expect("Failed to create mock pool");
232 let queue = JobQueue::new(pool);
233 let registry = JobRegistry::new();
234 let _dispatcher = JobDispatcher::new(queue, registry);
235 }
236}