forge_runtime/jobs/
dispatcher.rs1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13#[derive(Clone)]
15pub struct JobDispatcher {
16 queue: JobQueue,
17 registry: JobRegistry,
18}
19
20impl JobDispatcher {
21 pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23 Self { queue, registry }
24 }
25
26 pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28 where
29 J::Args: serde::Serialize,
30 {
31 let info = J::info();
32 self.dispatch_with_info(&info, serde_json::to_value(args)?)
33 .await
34 }
35
36 pub async fn dispatch_in<J: ForgeJob>(
38 &self,
39 delay: Duration,
40 args: J::Args,
41 ) -> Result<Uuid, forge_core::ForgeError>
42 where
43 J::Args: serde::Serialize,
44 {
45 let info = J::info();
46 let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48 .await
49 }
50
51 pub async fn dispatch_at<J: ForgeJob>(
53 &self,
54 at: DateTime<Utc>,
55 args: J::Args,
56 ) -> Result<Uuid, forge_core::ForgeError>
57 where
58 J::Args: serde::Serialize,
59 {
60 let info = J::info();
61 self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62 .await
63 }
64
65 pub async fn dispatch_by_name(
67 &self,
68 job_type: &str,
69 args: serde_json::Value,
70 ) -> Result<Uuid, forge_core::ForgeError> {
71 let entry = self.registry.get(job_type).ok_or_else(|| {
72 forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
73 })?;
74
75 self.dispatch_with_info(&entry.info, args).await
76 }
77
78 async fn dispatch_with_info(
80 &self,
81 info: &JobInfo,
82 args: serde_json::Value,
83 ) -> Result<Uuid, forge_core::ForgeError> {
84 let mut job = JobRecord::new(
85 info.name,
86 args,
87 info.priority,
88 info.retry.max_attempts as i32,
89 );
90
91 if let Some(cap) = info.worker_capability {
92 job = job.with_capability(cap);
93 }
94
95 self.queue
96 .enqueue(job)
97 .await
98 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
99 }
100
101 async fn dispatch_at_with_info(
103 &self,
104 info: &JobInfo,
105 args: serde_json::Value,
106 scheduled_at: DateTime<Utc>,
107 ) -> Result<Uuid, forge_core::ForgeError> {
108 let mut job = JobRecord::new(
109 info.name,
110 args,
111 info.priority,
112 info.retry.max_attempts as i32,
113 )
114 .with_scheduled_at(scheduled_at);
115
116 if let Some(cap) = info.worker_capability {
117 job = job.with_capability(cap);
118 }
119
120 self.queue
121 .enqueue(job)
122 .await
123 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
124 }
125
126 pub async fn dispatch_idempotent<J: ForgeJob>(
128 &self,
129 idempotency_key: impl Into<String>,
130 args: J::Args,
131 ) -> Result<Uuid, forge_core::ForgeError>
132 where
133 J::Args: serde::Serialize,
134 {
135 let info = J::info();
136 let mut job = JobRecord::new(
137 info.name,
138 serde_json::to_value(args)?,
139 info.priority,
140 info.retry.max_attempts as i32,
141 )
142 .with_idempotency_key(idempotency_key);
143
144 if let Some(cap) = info.worker_capability {
145 job = job.with_capability(cap);
146 }
147
148 self.queue
149 .enqueue(job)
150 .await
151 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
152 }
153
154 pub async fn dispatch_with_priority<J: ForgeJob>(
156 &self,
157 priority: JobPriority,
158 args: J::Args,
159 ) -> Result<Uuid, forge_core::ForgeError>
160 where
161 J::Args: serde::Serialize,
162 {
163 let info = J::info();
164 let mut job = JobRecord::new(
165 info.name,
166 serde_json::to_value(args)?,
167 priority,
168 info.retry.max_attempts as i32,
169 );
170
171 if let Some(cap) = info.worker_capability {
172 job = job.with_capability(cap);
173 }
174
175 self.queue
176 .enqueue(job)
177 .await
178 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
179 }
180}
181
182impl JobDispatch for JobDispatcher {
183 fn get_info(&self, job_type: &str) -> Option<JobInfo> {
184 self.registry.get(job_type).map(|e| e.info.clone())
185 }
186
187 fn dispatch_by_name(
188 &self,
189 job_type: &str,
190 args: serde_json::Value,
191 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
192 let job_type = job_type.to_string();
193 Box::pin(async move { self.dispatch_by_name(&job_type, args).await })
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[tokio::test]
202 async fn test_dispatcher_creation() {
203 let pool = sqlx::postgres::PgPoolOptions::new()
204 .max_connections(1)
205 .connect_lazy("postgres://localhost/nonexistent")
206 .expect("Failed to create mock pool");
207 let queue = JobQueue::new(pool);
208 let registry = JobRegistry::new();
209 let _dispatcher = JobDispatcher::new(queue, registry);
210 }
211}