use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use super::requests::PdfRequest;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Queued,
Processing,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PdfJob {
pub id: String,
pub request: PdfRequest,
pub status: JobStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub result: Option<String>,
pub error: Option<String>,
}
impl PdfJob {
pub fn new(request: PdfRequest) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4().to_string(),
request,
status: JobStatus::Queued,
created_at: now,
updated_at: now,
result: None,
error: None,
}
}
pub fn update_status(&mut self, status: JobStatus) {
self.status = status;
self.updated_at = Utc::now();
}
pub fn set_result(&mut self, result: String) {
self.result = Some(result);
self.status = JobStatus::Completed;
self.updated_at = Utc::now();
}
pub fn set_error(&mut self, error: String) {
self.error = Some(error);
self.status = JobStatus::Failed;
self.updated_at = Utc::now();
}
}
pub struct JobQueue {
jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
tx: mpsc::Sender<PdfJob>,
_handle: Option<tokio::task::JoinHandle<()>>,
}
impl JobQueue {
pub fn new() -> Self {
Self::with_capacity(1000)
}
pub fn with_capacity(capacity: usize) -> Self {
let jobs = Arc::new(RwLock::new(HashMap::new()));
let (tx, rx) = mpsc::channel(capacity);
let queue_jobs = jobs.clone();
let handle = tokio::spawn(async move {
Self::process_jobs(queue_jobs, rx).await;
});
Self {
jobs,
tx,
_handle: Some(handle),
}
}
pub async fn enqueue(&self, mut job: PdfJob) -> anyhow::Result<()> {
job.update_status(JobStatus::Queued);
{
let mut jobs = self.jobs.write().await;
jobs.insert(job.id.clone(), job.clone());
}
self.tx.send(job).await?;
Ok(())
}
pub async fn get_status(&self, id: &str) -> Option<JobStatus> {
let jobs = self.jobs.read().await;
jobs.get(id).map(|job| job.status.clone())
}
pub async fn get_result(&self, id: &str) -> Option<String> {
let jobs = self.jobs.read().await;
jobs.get(id).and_then(|job| job.result.clone())
}
pub async fn get_error(&self, id: &str) -> Option<String> {
let jobs = self.jobs.read().await;
jobs.get(id).and_then(|job| job.error.clone())
}
pub async fn cancel(&self, id: &str) -> anyhow::Result<()> {
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(id) {
job.update_status(JobStatus::Cancelled);
Ok(())
} else {
anyhow::bail!("Job not found")
}
}
async fn process_jobs(
jobs: Arc<RwLock<HashMap<String, PdfJob>>>,
mut rx: mpsc::Receiver<PdfJob>,
) {
while let Some(job) = rx.recv().await {
let job_id = job.id.clone();
{
let mut jobs_lock = jobs.write().await;
if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
stored_job.update_status(JobStatus::Processing);
}
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
{
let mut jobs_lock = jobs.write().await;
if let Some(stored_job) = jobs_lock.get_mut(&job_id) {
stored_job.set_result("Processed PDF content".to_string());
if let Some(webhook_url) = &stored_job.request.webhook_url {
Self::send_webhook(webhook_url, stored_job).await;
}
}
}
}
}
async fn send_webhook(url: &str, job: &PdfJob) {
let client = reqwest::Client::new();
let payload = serde_json::json!({
"job_id": job.id,
"status": job.status,
"result": job.result,
"error": job.error,
});
if let Err(e) = client.post(url).json(&payload).send().await {
tracing::error!("Failed to send webhook to {}: {:?}", url, e);
}
}
}
impl Default for JobQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::requests::{PdfOptions, RequestMetadata};
#[tokio::test]
async fn test_job_creation() {
let request = PdfRequest {
url: "https://example.com/test.pdf".to_string(),
options: PdfOptions::default(),
webhook_url: None,
metadata: RequestMetadata::default(),
};
let job = PdfJob::new(request);
assert_eq!(job.status, JobStatus::Queued);
assert!(job.result.is_none());
assert!(job.error.is_none());
}
#[tokio::test]
async fn test_job_queue_enqueue() {
let queue = JobQueue::new();
let request = PdfRequest {
url: "https://example.com/test.pdf".to_string(),
options: PdfOptions::default(),
webhook_url: None,
metadata: RequestMetadata::default(),
};
let job = PdfJob::new(request);
let job_id = job.id.clone();
queue.enqueue(job).await.unwrap();
let status = queue.get_status(&job_id).await;
assert!(status.is_some());
}
#[tokio::test]
async fn test_job_cancellation() {
let queue = JobQueue::new();
let request = PdfRequest {
url: "https://example.com/test.pdf".to_string(),
options: PdfOptions::default(),
webhook_url: None,
metadata: RequestMetadata::default(),
};
let job = PdfJob::new(request);
let job_id = job.id.clone();
queue.enqueue(job).await.unwrap();
queue.cancel(&job_id).await.unwrap();
let status = queue.get_status(&job_id).await;
assert_eq!(status, Some(JobStatus::Cancelled));
}
}