use async_trait::async_trait;
use lilqueue::{BackoffStrategy, Job, JobError, JobProcessor, ProcessorOptions};
use lilqueue_seaorm::{SeaOrmQueue, SeaOrmQueueOptions};
use std::{path::Path, time::Duration};
use tempfile::tempdir;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct WriteFileJob {
output_path: String,
line: String,
}
#[async_trait]
impl Job for WriteFileJob {
async fn process(&self) -> Result<(), JobError> {
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.output_path)
.map_err(|e| JobError::permanent(e.to_string()))?;
writeln!(file, "{}", self.line).map_err(|e| JobError::permanent(e.to_string()))?;
Ok(())
}
}
#[tokio::test]
async fn seaorm_queue_processes_job() {
let dir = tempdir().unwrap();
let queue = SeaOrmQueue::connect(
&sqlite_url(dir.path().join("queue.db")),
SeaOrmQueueOptions::default(),
)
.await
.unwrap();
let processor = JobProcessor::<WriteFileJob, _>::new(queue, test_options());
let worker = processor.spawn_worker();
let output_path = dir.path().join("output.log");
processor
.enqueue(&WriteFileJob {
output_path: output_path.to_string_lossy().to_string(),
line: "seaorm".to_string(),
})
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(2), async {
loop {
if std::fs::read_to_string(&output_path)
.map(|contents| contents.contains("seaorm"))
.unwrap_or(false)
{
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
worker.shutdown_and_wait().await;
}
fn test_options() -> ProcessorOptions {
ProcessorOptions {
max_attempts: 3,
poll_interval: Duration::from_millis(1),
backoff: BackoffStrategy::Fixed(Duration::ZERO),
}
}
fn sqlite_url(path: impl AsRef<Path>) -> String {
format!("sqlite://{}?mode=rwc", path.as_ref().display())
}