Skip to main content

forge_runtime/jobs/
dispatcher.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13/// Dispatches jobs to the queue.
14#[derive(Clone)]
15pub struct JobDispatcher {
16    queue: JobQueue,
17    registry: JobRegistry,
18}
19
20impl JobDispatcher {
21    /// Create a new job dispatcher.
22    pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23        Self { queue, registry }
24    }
25
26    /// Dispatch a job immediately.
27    pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28    where
29        J::Args: serde::Serialize,
30    {
31        let info = J::info();
32        self.dispatch_with_info(&info, serde_json::to_value(args)?, None)
33            .await
34    }
35
36    /// Dispatch a job with a delay.
37    pub async fn dispatch_in<J: ForgeJob>(
38        &self,
39        delay: Duration,
40        args: J::Args,
41    ) -> Result<Uuid, forge_core::ForgeError>
42    where
43        J::Args: serde::Serialize,
44    {
45        let info = J::info();
46        let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48            .await
49    }
50
51    /// Dispatch a job at a specific time.
52    pub async fn dispatch_at<J: ForgeJob>(
53        &self,
54        at: DateTime<Utc>,
55        args: J::Args,
56    ) -> Result<Uuid, forge_core::ForgeError>
57    where
58        J::Args: serde::Serialize,
59    {
60        let info = J::info();
61        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62            .await
63    }
64
65    /// Dispatch job by name (dynamic).
66    pub async fn dispatch_by_name(
67        &self,
68        job_type: &str,
69        args: serde_json::Value,
70        owner_subject: Option<String>,
71    ) -> Result<Uuid, forge_core::ForgeError> {
72        let entry = self.registry.get(job_type).ok_or_else(|| {
73            forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
74        })?;
75
76        self.dispatch_with_info(&entry.info, args, owner_subject)
77            .await
78    }
79
80    /// Dispatch job with explicit info.
81    async fn dispatch_with_info(
82        &self,
83        info: &JobInfo,
84        args: serde_json::Value,
85        owner_subject: Option<String>,
86    ) -> Result<Uuid, forge_core::ForgeError> {
87        let mut job = JobRecord::new(
88            info.name,
89            args,
90            info.priority,
91            info.retry.max_attempts as i32,
92        )
93        .with_owner_subject(owner_subject);
94
95        if let Some(cap) = info.worker_capability {
96            job = job.with_capability(cap);
97        }
98
99        self.queue
100            .enqueue(job)
101            .await
102            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
103    }
104
105    /// Request cancellation for a job.
106    pub async fn cancel(
107        &self,
108        job_id: Uuid,
109        reason: Option<&str>,
110    ) -> Result<bool, forge_core::ForgeError> {
111        self.queue
112            .request_cancel(job_id, reason)
113            .await
114            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
115    }
116
117    /// Dispatch job at specific time with explicit info.
118    async fn dispatch_at_with_info(
119        &self,
120        info: &JobInfo,
121        args: serde_json::Value,
122        scheduled_at: DateTime<Utc>,
123    ) -> Result<Uuid, forge_core::ForgeError> {
124        let mut job = JobRecord::new(
125            info.name,
126            args,
127            info.priority,
128            info.retry.max_attempts as i32,
129        )
130        .with_scheduled_at(scheduled_at);
131
132        if let Some(cap) = info.worker_capability {
133            job = job.with_capability(cap);
134        }
135
136        self.queue
137            .enqueue(job)
138            .await
139            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
140    }
141
142    /// Dispatch job with idempotency key.
143    pub async fn dispatch_idempotent<J: ForgeJob>(
144        &self,
145        idempotency_key: impl Into<String>,
146        args: J::Args,
147    ) -> Result<Uuid, forge_core::ForgeError>
148    where
149        J::Args: serde::Serialize,
150    {
151        let info = J::info();
152        let mut job = JobRecord::new(
153            info.name,
154            serde_json::to_value(args)?,
155            info.priority,
156            info.retry.max_attempts as i32,
157        )
158        .with_idempotency_key(idempotency_key);
159
160        if let Some(cap) = info.worker_capability {
161            job = job.with_capability(cap);
162        }
163
164        self.queue
165            .enqueue(job)
166            .await
167            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
168    }
169
170    /// Dispatch job with custom priority.
171    pub async fn dispatch_with_priority<J: ForgeJob>(
172        &self,
173        priority: JobPriority,
174        args: J::Args,
175    ) -> Result<Uuid, forge_core::ForgeError>
176    where
177        J::Args: serde::Serialize,
178    {
179        let info = J::info();
180        let mut job = JobRecord::new(
181            info.name,
182            serde_json::to_value(args)?,
183            priority,
184            info.retry.max_attempts as i32,
185        );
186
187        if let Some(cap) = info.worker_capability {
188            job = job.with_capability(cap);
189        }
190
191        self.queue
192            .enqueue(job)
193            .await
194            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
195    }
196}
197
198impl JobDispatch for JobDispatcher {
199    fn get_info(&self, job_type: &str) -> Option<JobInfo> {
200        self.registry.get(job_type).map(|e| e.info.clone())
201    }
202
203    fn dispatch_by_name(
204        &self,
205        job_type: &str,
206        args: serde_json::Value,
207        owner_subject: Option<String>,
208    ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
209        let job_type = job_type.to_string();
210        Box::pin(async move { self.dispatch_by_name(&job_type, args, owner_subject).await })
211    }
212
213    fn cancel(
214        &self,
215        job_id: Uuid,
216        reason: Option<String>,
217    ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
218        Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[tokio::test]
227    async fn test_dispatcher_creation() {
228        let pool = sqlx::postgres::PgPoolOptions::new()
229            .max_connections(1)
230            .connect_lazy("postgres://localhost/nonexistent")
231            .expect("Failed to create mock pool");
232        let queue = JobQueue::new(pool);
233        let registry = JobRegistry::new();
234        let _dispatcher = JobDispatcher::new(queue, registry);
235    }
236}