Skip to main content

forge_runtime/jobs/
dispatcher.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13/// Dispatches jobs to the queue.
14#[derive(Clone)]
15pub struct JobDispatcher {
16    queue: JobQueue,
17    registry: JobRegistry,
18}
19
20impl JobDispatcher {
21    /// Create a new job dispatcher.
22    pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23        Self { queue, registry }
24    }
25
26    /// Dispatch a job immediately.
27    pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28    where
29        J::Args: serde::Serialize,
30    {
31        let info = J::info();
32        self.dispatch_with_info(&info, serde_json::to_value(args)?, None)
33            .await
34    }
35
36    /// Dispatch a job with a delay.
37    pub async fn dispatch_in<J: ForgeJob>(
38        &self,
39        delay: Duration,
40        args: J::Args,
41    ) -> Result<Uuid, forge_core::ForgeError>
42    where
43        J::Args: serde::Serialize,
44    {
45        let info = J::info();
46        let scheduled_at = Utc::now()
47            + chrono::Duration::from_std(delay)
48                .map_err(|_| forge_core::ForgeError::InvalidArgument("delay too large".into()))?;
49        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
50            .await
51    }
52
53    /// Dispatch a job at a specific time.
54    pub async fn dispatch_at<J: ForgeJob>(
55        &self,
56        at: DateTime<Utc>,
57        args: J::Args,
58    ) -> Result<Uuid, forge_core::ForgeError>
59    where
60        J::Args: serde::Serialize,
61    {
62        let info = J::info();
63        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
64            .await
65    }
66
67    /// Dispatch job by name (dynamic).
68    pub async fn dispatch_by_name(
69        &self,
70        job_type: &str,
71        args: serde_json::Value,
72        owner_subject: Option<String>,
73    ) -> Result<Uuid, forge_core::ForgeError> {
74        let entry = self.registry.get(job_type).ok_or_else(|| {
75            forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
76        })?;
77
78        self.dispatch_with_info(&entry.info, args, owner_subject)
79            .await
80    }
81
82    /// Dispatch job with explicit info.
83    async fn dispatch_with_info(
84        &self,
85        info: &JobInfo,
86        args: serde_json::Value,
87        owner_subject: Option<String>,
88    ) -> Result<Uuid, forge_core::ForgeError> {
89        let mut job = JobRecord::new(
90            info.name,
91            args,
92            info.priority,
93            info.retry.max_attempts as i32,
94        )
95        .with_owner_subject(owner_subject);
96
97        if let Some(cap) = info.worker_capability {
98            job = job.with_capability(cap);
99        }
100
101        self.queue
102            .enqueue(job)
103            .await
104            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
105    }
106
107    /// Request cancellation for a job.
108    pub async fn cancel(
109        &self,
110        job_id: Uuid,
111        reason: Option<&str>,
112    ) -> Result<bool, forge_core::ForgeError> {
113        self.queue
114            .request_cancel(job_id, reason)
115            .await
116            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
117    }
118
119    /// Dispatch job at specific time with explicit info.
120    async fn dispatch_at_with_info(
121        &self,
122        info: &JobInfo,
123        args: serde_json::Value,
124        scheduled_at: DateTime<Utc>,
125    ) -> Result<Uuid, forge_core::ForgeError> {
126        let mut job = JobRecord::new(
127            info.name,
128            args,
129            info.priority,
130            info.retry.max_attempts as i32,
131        )
132        .with_scheduled_at(scheduled_at);
133
134        if let Some(cap) = info.worker_capability {
135            job = job.with_capability(cap);
136        }
137
138        self.queue
139            .enqueue(job)
140            .await
141            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
142    }
143
144    /// Dispatch job with idempotency key.
145    pub async fn dispatch_idempotent<J: ForgeJob>(
146        &self,
147        idempotency_key: impl Into<String>,
148        args: J::Args,
149    ) -> Result<Uuid, forge_core::ForgeError>
150    where
151        J::Args: serde::Serialize,
152    {
153        let info = J::info();
154        let mut job = JobRecord::new(
155            info.name,
156            serde_json::to_value(args)?,
157            info.priority,
158            info.retry.max_attempts as i32,
159        )
160        .with_idempotency_key(idempotency_key);
161
162        if let Some(cap) = info.worker_capability {
163            job = job.with_capability(cap);
164        }
165
166        self.queue
167            .enqueue(job)
168            .await
169            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
170    }
171
172    /// Dispatch job with custom priority.
173    pub async fn dispatch_with_priority<J: ForgeJob>(
174        &self,
175        priority: JobPriority,
176        args: J::Args,
177    ) -> Result<Uuid, forge_core::ForgeError>
178    where
179        J::Args: serde::Serialize,
180    {
181        let info = J::info();
182        let mut job = JobRecord::new(
183            info.name,
184            serde_json::to_value(args)?,
185            priority,
186            info.retry.max_attempts as i32,
187        );
188
189        if let Some(cap) = info.worker_capability {
190            job = job.with_capability(cap);
191        }
192
193        self.queue
194            .enqueue(job)
195            .await
196            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
197    }
198}
199
200impl JobDispatch for JobDispatcher {
201    fn get_info(&self, job_type: &str) -> Option<JobInfo> {
202        self.registry.get(job_type).map(|e| e.info.clone())
203    }
204
205    fn dispatch_by_name(
206        &self,
207        job_type: &str,
208        args: serde_json::Value,
209        owner_subject: Option<String>,
210    ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
211        let job_type = job_type.to_string();
212        Box::pin(async move { self.dispatch_by_name(&job_type, args, owner_subject).await })
213    }
214
215    fn cancel(
216        &self,
217        job_id: Uuid,
218        reason: Option<String>,
219    ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
220        Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[tokio::test]
229    async fn test_dispatcher_creation() {
230        let pool = sqlx::postgres::PgPoolOptions::new()
231            .max_connections(1)
232            .connect_lazy("postgres://localhost/nonexistent")
233            .expect("Failed to create mock pool");
234        let queue = JobQueue::new(pool);
235        let registry = JobRegistry::new();
236        let _dispatcher = JobDispatcher::new(queue, registry);
237    }
238}