ruvector_scipix/api/
jobs.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{mpsc, RwLock};
6use uuid::Uuid;
7
8use super::requests::PdfRequest;
9
10/// Job status enumeration
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12#[serde(rename_all = "lowercase")]
13pub enum JobStatus {
14    /// Job is queued but not started
15    Queued,
16    /// Job is currently processing
17    Processing,
18    /// Job completed successfully
19    Completed,
20    /// Job failed with error
21    Failed,
22    /// Job was cancelled
23    Cancelled,
24}
25
26/// PDF processing job
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct PdfJob {
29    /// Unique job identifier
30    pub id: String,
31
32    /// Original request
33    pub request: PdfRequest,
34
35    /// Current status
36    pub status: JobStatus,
37
38    /// Creation timestamp
39    pub created_at: DateTime<Utc>,
40
41    /// Last update timestamp
42    pub updated_at: DateTime<Utc>,
43
44    /// Processing result
45    pub result: Option<String>,
46
47    /// Error message (if failed)
48    pub error: Option<String>,
49}
50
51impl PdfJob {
52    /// Create a new PDF job
53    pub fn new(request: PdfRequest) -> Self {
54        let now = Utc::now();
55        Self {
56            id: Uuid::new_v4().to_string(),
57            request,
58            status: JobStatus::Queued,
59            created_at: now,
60            updated_at: now,
61            result: None,
62            error: None,
63        }
64    }
65
66    /// Update job status
67    pub fn update_status(&mut self, status: JobStatus) {
68        self.status = status;
69        self.updated_at = Utc::now();
70    }
71
72    /// Set job result
73    pub fn set_result(&mut self, result: String) {
74        self.result = Some(result);
75        self.status = JobStatus::Completed;
76        self.updated_at = Utc::now();
77    }
78
79    /// Set job error
80    pub fn set_error(&mut self, error: String) {
81        self.error = Some(error);
82        self.status = JobStatus::Failed;
83        self.updated_at = Utc::now();
84    }
85}
86
87/// Async job queue with webhook support
88pub struct JobQueue {
89    /// Job storage
90    jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
91
92    /// Job submission channel
93    tx: mpsc::Sender<PdfJob>,
94
95    /// Job processing handle
96    _handle: Option<tokio::task::JoinHandle<()>>,
97}
98
99impl JobQueue {
100    /// Create a new job queue
101    pub fn new() -> Self {
102        Self::with_capacity(1000)
103    }
104
105    /// Create a job queue with specific capacity
106    pub fn with_capacity(capacity: usize) -> Self {
107        let jobs = Arc::new(RwLock::new(HashMap::new()));
108        let (tx, rx) = mpsc::channel(capacity);
109
110        let queue_jobs = jobs.clone();
111        let handle = tokio::spawn(async move {
112            Self::process_jobs(queue_jobs, rx).await;
113        });
114
115        Self {
116            jobs,
117            tx,
118            _handle: Some(handle),
119        }
120    }
121
122    /// Enqueue a new job
123    pub async fn enqueue(&self, mut job: PdfJob) -> anyhow::Result<()> {
124        job.update_status(JobStatus::Queued);
125
126        // Store job
127        {
128            let mut jobs = self.jobs.write().await;
129            jobs.insert(job.id.clone(), job.clone());
130        }
131
132        // Send to processing queue
133        self.tx.send(job).await?;
134
135        Ok(())
136    }
137
138    /// Get job status
139    pub async fn get_status(&self, id: &str) -> Option<JobStatus> {
140        let jobs = self.jobs.read().await;
141        jobs.get(id).map(|job| job.status.clone())
142    }
143
144    /// Get job result
145    pub async fn get_result(&self, id: &str) -> Option<String> {
146        let jobs = self.jobs.read().await;
147        jobs.get(id).and_then(|job| job.result.clone())
148    }
149
150    /// Get job error
151    pub async fn get_error(&self, id: &str) -> Option<String> {
152        let jobs = self.jobs.read().await;
153        jobs.get(id).and_then(|job| job.error.clone())
154    }
155
156    /// Cancel a job
157    pub async fn cancel(&self, id: &str) -> anyhow::Result<()> {
158        let mut jobs = self.jobs.write().await;
159        if let Some(job) = jobs.get_mut(id) {
160            job.update_status(JobStatus::Cancelled);
161            Ok(())
162        } else {
163            anyhow::bail!("Job not found")
164        }
165    }
166
167    /// Background job processor
168    async fn process_jobs(
169        jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
170        mut rx: mpsc::Receiver<PdfJob>,
171    ) {
172        while let Some(job) = rx.recv().await {
173            let job_id = job.id.clone();
174
175            // Update status to processing
176            {
177                let mut jobs_lock = jobs.write().await;
178                if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
179                    stored_job.update_status(JobStatus::Processing);
180                }
181            }
182
183            // Simulate PDF processing
184            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
185
186            // Update with result
187            {
188                let mut jobs_lock = jobs.write().await;
189                if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
190                    stored_job.set_result("Processed PDF content".to_string());
191
192                    // Send webhook if specified
193                    if let Some(webhook_url) = &stored_job.request.webhook_url {
194                        Self::send_webhook(webhook_url, stored_job).await;
195                    }
196                }
197            }
198        }
199    }
200
201    /// Send webhook notification
202    async fn send_webhook(url: &str, job: &PdfJob) {
203        let client = reqwest::Client::new();
204        let payload = serde_json::json!({
205            "job_id": job.id,
206            "status": job.status,
207            "result": job.result,
208            "error": job.error,
209        });
210
211        if let Err(e) = client.post(url).json(&payload).send().await {
212            tracing::error!("Failed to send webhook to {}: {:?}", url, e);
213        }
214    }
215}
216
217impl Default for JobQueue {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::api::requests::{PdfOptions, RequestMetadata};
227
228    #[tokio::test]
229    async fn test_job_creation() {
230        let request = PdfRequest {
231            url: "https://example.com/test.pdf".to_string(),
232            options: PdfOptions::default(),
233            webhook_url: None,
234            metadata: RequestMetadata::default(),
235        };
236
237        let job = PdfJob::new(request);
238        assert_eq!(job.status, JobStatus::Queued);
239        assert!(job.result.is_none());
240        assert!(job.error.is_none());
241    }
242
243    #[tokio::test]
244    async fn test_job_queue_enqueue() {
245        let queue = JobQueue::new();
246        let request = PdfRequest {
247            url: "https://example.com/test.pdf".to_string(),
248            options: PdfOptions::default(),
249            webhook_url: None,
250            metadata: RequestMetadata::default(),
251        };
252
253        let job = PdfJob::new(request);
254        let job_id = job.id.clone();
255
256        queue.enqueue(job).await.unwrap();
257
258        let status = queue.get_status(&job_id).await;
259        assert!(status.is_some());
260    }
261
262    #[tokio::test]
263    async fn test_job_cancellation() {
264        let queue = JobQueue::new();
265        let request = PdfRequest {
266            url: "https://example.com/test.pdf".to_string(),
267            options: PdfOptions::default(),
268            webhook_url: None,
269            metadata: RequestMetadata::default(),
270        };
271
272        let job = PdfJob::new(request);
273        let job_id = job.id.clone();
274
275        queue.enqueue(job).await.unwrap();
276        queue.cancel(&job_id).await.unwrap();
277
278        let status = queue.get_status(&job_id).await;
279        assert_eq!(status, Some(JobStatus::Cancelled));
280    }
281}