ruvector_scipix/api/
jobs.rs1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{mpsc, RwLock};
6use uuid::Uuid;
7
8use super::requests::PdfRequest;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "lowercase")]
13pub enum JobStatus {
14 Queued,
16 Processing,
18 Completed,
20 Failed,
22 Cancelled,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct PdfJob {
29 pub id: String,
31
32 pub request: PdfRequest,
34
35 pub status: JobStatus,
37
38 pub created_at: DateTime<Utc>,
40
41 pub updated_at: DateTime<Utc>,
43
44 pub result: Option<String>,
46
47 pub error: Option<String>,
49}
50
51impl PdfJob {
52 pub fn new(request: PdfRequest) -> Self {
54 let now = Utc::now();
55 Self {
56 id: Uuid::new_v4().to_string(),
57 request,
58 status: JobStatus::Queued,
59 created_at: now,
60 updated_at: now,
61 result: None,
62 error: None,
63 }
64 }
65
66 pub fn update_status(&mut self, status: JobStatus) {
68 self.status = status;
69 self.updated_at = Utc::now();
70 }
71
72 pub fn set_result(&mut self, result: String) {
74 self.result = Some(result);
75 self.status = JobStatus::Completed;
76 self.updated_at = Utc::now();
77 }
78
79 pub fn set_error(&mut self, error: String) {
81 self.error = Some(error);
82 self.status = JobStatus::Failed;
83 self.updated_at = Utc::now();
84 }
85}
86
87pub struct JobQueue {
89 jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
91
92 tx: mpsc::Sender<PdfJob>,
94
95 _handle: Option<tokio::task::JoinHandle<()>>,
97}
98
99impl JobQueue {
100 pub fn new() -> Self {
102 Self::with_capacity(1000)
103 }
104
105 pub fn with_capacity(capacity: usize) -> Self {
107 let jobs = Arc::new(RwLock::new(HashMap::new()));
108 let (tx, rx) = mpsc::channel(capacity);
109
110 let queue_jobs = jobs.clone();
111 let handle = tokio::spawn(async move {
112 Self::process_jobs(queue_jobs, rx).await;
113 });
114
115 Self {
116 jobs,
117 tx,
118 _handle: Some(handle),
119 }
120 }
121
122 pub async fn enqueue(&self, mut job: PdfJob) -> anyhow::Result<()> {
124 job.update_status(JobStatus::Queued);
125
126 {
128 let mut jobs = self.jobs.write().await;
129 jobs.insert(job.id.clone(), job.clone());
130 }
131
132 self.tx.send(job).await?;
134
135 Ok(())
136 }
137
138 pub async fn get_status(&self, id: &str) -> Option<JobStatus> {
140 let jobs = self.jobs.read().await;
141 jobs.get(id).map(|job| job.status.clone())
142 }
143
144 pub async fn get_result(&self, id: &str) -> Option<String> {
146 let jobs = self.jobs.read().await;
147 jobs.get(id).and_then(|job| job.result.clone())
148 }
149
150 pub async fn get_error(&self, id: &str) -> Option<String> {
152 let jobs = self.jobs.read().await;
153 jobs.get(id).and_then(|job| job.error.clone())
154 }
155
156 pub async fn cancel(&self, id: &str) -> anyhow::Result<()> {
158 let mut jobs = self.jobs.write().await;
159 if let Some(job) = jobs.get_mut(id) {
160 job.update_status(JobStatus::Cancelled);
161 Ok(())
162 } else {
163 anyhow::bail!("Job not found")
164 }
165 }
166
167 async fn process_jobs(
169 jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
170 mut rx: mpsc::Receiver<PdfJob>,
171 ) {
172 while let Some(job) = rx.recv().await {
173 let job_id = job.id.clone();
174
175 {
177 let mut jobs_lock = jobs.write().await;
178 if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
179 stored_job.update_status(JobStatus::Processing);
180 }
181 }
182
183 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
185
186 {
188 let mut jobs_lock = jobs.write().await;
189 if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
190 stored_job.set_result("Processed PDF content".to_string());
191
192 if let Some(webhook_url) = &stored_job.request.webhook_url {
194 Self::send_webhook(webhook_url, stored_job).await;
195 }
196 }
197 }
198 }
199 }
200
201 async fn send_webhook(url: &str, job: &PdfJob) {
203 let client = reqwest::Client::new();
204 let payload = serde_json::json!({
205 "job_id": job.id,
206 "status": job.status,
207 "result": job.result,
208 "error": job.error,
209 });
210
211 if let Err(e) = client.post(url).json(&payload).send().await {
212 tracing::error!("Failed to send webhook to {}: {:?}", url, e);
213 }
214 }
215}
216
217impl Default for JobQueue {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use crate::api::requests::{PdfOptions, RequestMetadata};
227
228 #[tokio::test]
229 async fn test_job_creation() {
230 let request = PdfRequest {
231 url: "https://example.com/test.pdf".to_string(),
232 options: PdfOptions::default(),
233 webhook_url: None,
234 metadata: RequestMetadata::default(),
235 };
236
237 let job = PdfJob::new(request);
238 assert_eq!(job.status, JobStatus::Queued);
239 assert!(job.result.is_none());
240 assert!(job.error.is_none());
241 }
242
243 #[tokio::test]
244 async fn test_job_queue_enqueue() {
245 let queue = JobQueue::new();
246 let request = PdfRequest {
247 url: "https://example.com/test.pdf".to_string(),
248 options: PdfOptions::default(),
249 webhook_url: None,
250 metadata: RequestMetadata::default(),
251 };
252
253 let job = PdfJob::new(request);
254 let job_id = job.id.clone();
255
256 queue.enqueue(job).await.unwrap();
257
258 let status = queue.get_status(&job_id).await;
259 assert!(status.is_some());
260 }
261
262 #[tokio::test]
263 async fn test_job_cancellation() {
264 let queue = JobQueue::new();
265 let request = PdfRequest {
266 url: "https://example.com/test.pdf".to_string(),
267 options: PdfOptions::default(),
268 webhook_url: None,
269 metadata: RequestMetadata::default(),
270 };
271
272 let job = PdfJob::new(request);
273 let job_id = job.id.clone();
274
275 queue.enqueue(job).await.unwrap();
276 queue.cancel(&job_id).await.unwrap();
277
278 let status = queue.get_status(&job_id).await;
279 assert_eq!(status, Some(JobStatus::Cancelled));
280 }
281}