Skip to main content

forge_runtime/jobs/
dispatcher.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13/// Dispatches jobs to the queue.
14#[derive(Clone)]
15pub struct JobDispatcher {
16    queue: JobQueue,
17    registry: JobRegistry,
18}
19
20impl JobDispatcher {
21    /// Create a new job dispatcher.
22    pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23        Self { queue, registry }
24    }
25
26    /// Dispatch a job immediately.
27    pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28    where
29        J::Args: serde::Serialize,
30    {
31        let info = J::info();
32        self.dispatch_with_info(&info, serde_json::to_value(args)?)
33            .await
34    }
35
36    /// Dispatch a job with a delay.
37    pub async fn dispatch_in<J: ForgeJob>(
38        &self,
39        delay: Duration,
40        args: J::Args,
41    ) -> Result<Uuid, forge_core::ForgeError>
42    where
43        J::Args: serde::Serialize,
44    {
45        let info = J::info();
46        let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48            .await
49    }
50
51    /// Dispatch a job at a specific time.
52    pub async fn dispatch_at<J: ForgeJob>(
53        &self,
54        at: DateTime<Utc>,
55        args: J::Args,
56    ) -> Result<Uuid, forge_core::ForgeError>
57    where
58        J::Args: serde::Serialize,
59    {
60        let info = J::info();
61        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62            .await
63    }
64
65    /// Dispatch job by name (dynamic).
66    pub async fn dispatch_by_name(
67        &self,
68        job_type: &str,
69        args: serde_json::Value,
70    ) -> Result<Uuid, forge_core::ForgeError> {
71        let entry = self.registry.get(job_type).ok_or_else(|| {
72            forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
73        })?;
74
75        self.dispatch_with_info(&entry.info, args).await
76    }
77
78    /// Dispatch job with explicit info.
79    async fn dispatch_with_info(
80        &self,
81        info: &JobInfo,
82        args: serde_json::Value,
83    ) -> Result<Uuid, forge_core::ForgeError> {
84        let mut job = JobRecord::new(
85            info.name,
86            args,
87            info.priority,
88            info.retry.max_attempts as i32,
89        );
90
91        if let Some(cap) = info.worker_capability {
92            job = job.with_capability(cap);
93        }
94
95        self.queue
96            .enqueue(job)
97            .await
98            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
99    }
100
101    /// Request cancellation for a job.
102    pub async fn cancel(
103        &self,
104        job_id: Uuid,
105        reason: Option<&str>,
106    ) -> Result<bool, forge_core::ForgeError> {
107        self.queue
108            .request_cancel(job_id, reason)
109            .await
110            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
111    }
112
113    /// Dispatch job at specific time with explicit info.
114    async fn dispatch_at_with_info(
115        &self,
116        info: &JobInfo,
117        args: serde_json::Value,
118        scheduled_at: DateTime<Utc>,
119    ) -> Result<Uuid, forge_core::ForgeError> {
120        let mut job = JobRecord::new(
121            info.name,
122            args,
123            info.priority,
124            info.retry.max_attempts as i32,
125        )
126        .with_scheduled_at(scheduled_at);
127
128        if let Some(cap) = info.worker_capability {
129            job = job.with_capability(cap);
130        }
131
132        self.queue
133            .enqueue(job)
134            .await
135            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
136    }
137
138    /// Dispatch job with idempotency key.
139    pub async fn dispatch_idempotent<J: ForgeJob>(
140        &self,
141        idempotency_key: impl Into<String>,
142        args: J::Args,
143    ) -> Result<Uuid, forge_core::ForgeError>
144    where
145        J::Args: serde::Serialize,
146    {
147        let info = J::info();
148        let mut job = JobRecord::new(
149            info.name,
150            serde_json::to_value(args)?,
151            info.priority,
152            info.retry.max_attempts as i32,
153        )
154        .with_idempotency_key(idempotency_key);
155
156        if let Some(cap) = info.worker_capability {
157            job = job.with_capability(cap);
158        }
159
160        self.queue
161            .enqueue(job)
162            .await
163            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
164    }
165
166    /// Dispatch job with custom priority.
167    pub async fn dispatch_with_priority<J: ForgeJob>(
168        &self,
169        priority: JobPriority,
170        args: J::Args,
171    ) -> Result<Uuid, forge_core::ForgeError>
172    where
173        J::Args: serde::Serialize,
174    {
175        let info = J::info();
176        let mut job = JobRecord::new(
177            info.name,
178            serde_json::to_value(args)?,
179            priority,
180            info.retry.max_attempts as i32,
181        );
182
183        if let Some(cap) = info.worker_capability {
184            job = job.with_capability(cap);
185        }
186
187        self.queue
188            .enqueue(job)
189            .await
190            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
191    }
192}
193
194impl JobDispatch for JobDispatcher {
195    fn get_info(&self, job_type: &str) -> Option<JobInfo> {
196        self.registry.get(job_type).map(|e| e.info.clone())
197    }
198
199    fn dispatch_by_name(
200        &self,
201        job_type: &str,
202        args: serde_json::Value,
203    ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
204        let job_type = job_type.to_string();
205        Box::pin(async move { self.dispatch_by_name(&job_type, args).await })
206    }
207
208    fn cancel(
209        &self,
210        job_id: Uuid,
211        reason: Option<String>,
212    ) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
213        Box::pin(async move { self.cancel(job_id, reason.as_deref()).await })
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[tokio::test]
222    async fn test_dispatcher_creation() {
223        let pool = sqlx::postgres::PgPoolOptions::new()
224            .max_connections(1)
225            .connect_lazy("postgres://localhost/nonexistent")
226            .expect("Failed to create mock pool");
227        let queue = JobQueue::new(pool);
228        let registry = JobRegistry::new();
229        let _dispatcher = JobDispatcher::new(queue, registry);
230    }
231}