forge_runtime/jobs/
dispatcher.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::function::JobDispatch;
7use forge_core::job::{ForgeJob, JobInfo, JobPriority};
8use uuid::Uuid;
9
10use super::queue::{JobQueue, JobRecord};
11use super::registry::JobRegistry;
12
13/// Dispatches jobs to the queue.
14#[derive(Clone)]
15pub struct JobDispatcher {
16    queue: JobQueue,
17    registry: JobRegistry,
18}
19
20impl JobDispatcher {
21    /// Create a new job dispatcher.
22    pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
23        Self { queue, registry }
24    }
25
26    /// Dispatch a job immediately.
27    pub async fn dispatch<J: ForgeJob>(&self, args: J::Args) -> Result<Uuid, forge_core::ForgeError>
28    where
29        J::Args: serde::Serialize,
30    {
31        let info = J::info();
32        self.dispatch_with_info(&info, serde_json::to_value(args)?)
33            .await
34    }
35
36    /// Dispatch a job with a delay.
37    pub async fn dispatch_in<J: ForgeJob>(
38        &self,
39        delay: Duration,
40        args: J::Args,
41    ) -> Result<Uuid, forge_core::ForgeError>
42    where
43        J::Args: serde::Serialize,
44    {
45        let info = J::info();
46        let scheduled_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
47        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, scheduled_at)
48            .await
49    }
50
51    /// Dispatch a job at a specific time.
52    pub async fn dispatch_at<J: ForgeJob>(
53        &self,
54        at: DateTime<Utc>,
55        args: J::Args,
56    ) -> Result<Uuid, forge_core::ForgeError>
57    where
58        J::Args: serde::Serialize,
59    {
60        let info = J::info();
61        self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at)
62            .await
63    }
64
65    /// Dispatch job by name (dynamic).
66    pub async fn dispatch_by_name(
67        &self,
68        job_type: &str,
69        args: serde_json::Value,
70    ) -> Result<Uuid, forge_core::ForgeError> {
71        let entry = self.registry.get(job_type).ok_or_else(|| {
72            forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
73        })?;
74
75        self.dispatch_with_info(&entry.info, args).await
76    }
77
78    /// Dispatch job with explicit info.
79    async fn dispatch_with_info(
80        &self,
81        info: &JobInfo,
82        args: serde_json::Value,
83    ) -> Result<Uuid, forge_core::ForgeError> {
84        let mut job = JobRecord::new(
85            info.name,
86            args,
87            info.priority,
88            info.retry.max_attempts as i32,
89        );
90
91        if let Some(cap) = info.worker_capability {
92            job = job.with_capability(cap);
93        }
94
95        self.queue
96            .enqueue(job)
97            .await
98            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
99    }
100
101    /// Dispatch job at specific time with explicit info.
102    async fn dispatch_at_with_info(
103        &self,
104        info: &JobInfo,
105        args: serde_json::Value,
106        scheduled_at: DateTime<Utc>,
107    ) -> Result<Uuid, forge_core::ForgeError> {
108        let mut job = JobRecord::new(
109            info.name,
110            args,
111            info.priority,
112            info.retry.max_attempts as i32,
113        )
114        .with_scheduled_at(scheduled_at);
115
116        if let Some(cap) = info.worker_capability {
117            job = job.with_capability(cap);
118        }
119
120        self.queue
121            .enqueue(job)
122            .await
123            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
124    }
125
126    /// Dispatch job with idempotency key.
127    pub async fn dispatch_idempotent<J: ForgeJob>(
128        &self,
129        idempotency_key: impl Into<String>,
130        args: J::Args,
131    ) -> Result<Uuid, forge_core::ForgeError>
132    where
133        J::Args: serde::Serialize,
134    {
135        let info = J::info();
136        let mut job = JobRecord::new(
137            info.name,
138            serde_json::to_value(args)?,
139            info.priority,
140            info.retry.max_attempts as i32,
141        )
142        .with_idempotency_key(idempotency_key);
143
144        if let Some(cap) = info.worker_capability {
145            job = job.with_capability(cap);
146        }
147
148        self.queue
149            .enqueue(job)
150            .await
151            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
152    }
153
154    /// Dispatch job with custom priority.
155    pub async fn dispatch_with_priority<J: ForgeJob>(
156        &self,
157        priority: JobPriority,
158        args: J::Args,
159    ) -> Result<Uuid, forge_core::ForgeError>
160    where
161        J::Args: serde::Serialize,
162    {
163        let info = J::info();
164        let mut job = JobRecord::new(
165            info.name,
166            serde_json::to_value(args)?,
167            priority,
168            info.retry.max_attempts as i32,
169        );
170
171        if let Some(cap) = info.worker_capability {
172            job = job.with_capability(cap);
173        }
174
175        self.queue
176            .enqueue(job)
177            .await
178            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))
179    }
180}
181
182impl JobDispatch for JobDispatcher {
183    fn get_info(&self, job_type: &str) -> Option<JobInfo> {
184        self.registry.get(job_type).map(|e| e.info.clone())
185    }
186
187    fn dispatch_by_name(
188        &self,
189        job_type: &str,
190        args: serde_json::Value,
191    ) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
192        let job_type = job_type.to_string();
193        Box::pin(async move { self.dispatch_by_name(&job_type, args).await })
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[tokio::test]
202    async fn test_dispatcher_creation() {
203        let pool = sqlx::postgres::PgPoolOptions::new()
204            .max_connections(1)
205            .connect_lazy("postgres://localhost/nonexistent")
206            .expect("Failed to create mock pool");
207        let queue = JobQueue::new(pool);
208        let registry = JobRegistry::new();
209        let _dispatcher = JobDispatcher::new(queue, registry);
210    }
211}