use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use rust_job_queue_api_worker_system::{
connect, migrate,
queue::enqueue,
worker::{ExecutionContext, ExecutionOutcome, Executor, WorkerRuntime},
Job, JobKind, NewJob, PoolConfig,
};
use serde_json::json;
use tokio_util::sync::CancellationToken;
struct LoggingExecutor;
#[async_trait]
impl Executor for LoggingExecutor {
async fn execute(&self, _ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
println!(
"executing job_id={} kind={} attempt={} payload={}",
job.id,
job.kind.as_str(),
job.attempts,
job.payload
);
ExecutionOutcome::Succeeded
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let database_url =
std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL must be set"))?;
let pool = connect(&PoolConfig::from_url(database_url)).await?;
migrate(&pool).await?;
for i in 0..3 {
enqueue(
&pool,
NewJob {
kind: JobKind::SummarizeText,
payload: json!({ "text": format!("example {i}") }),
max_attempts: Some(1),
idempotency_key: None,
},
)
.await?;
}
let cancel = CancellationToken::new();
let runtime = WorkerRuntime::new(pool.clone(), Arc::new(LoggingExecutor))
.with_concurrency(1)
.with_poll_interval(Duration::from_millis(50), Duration::from_millis(200));
let cancel_for_shutdown = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3)).await;
cancel_for_shutdown.cancel();
});
runtime.run(cancel).await;
println!("example done");
Ok(())
}