oxana 2.0.0-rc.5

A simple & fast job queue system.
Documentation
use deadpool_redis::redis::AsyncCommands;
use rand::distr::{Alphanumeric, SampleString};
use serde::{Deserialize, Serialize};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};

#[derive(Debug, thiserror::Error)]
pub enum WorkerError {
    #[error("Generic error: {0}")]
    Generic(String),
    #[error("Redis error: {0}")]
    Redis(#[from] deadpool_redis::redis::RedisError),
    #[error("Redis error: {0}")]
    Pool(#[from] deadpool_redis::PoolError),
}

#[derive(Clone)]
pub struct WorkerState {
    pub redis: deadpool_redis::Pool,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerNoopJob {}

pub struct WorkerNoop;

#[async_trait::async_trait]
impl oxana::Worker<WorkerNoopJob> for WorkerNoop {
    type Error = WorkerError;

    async fn run_batch(
        &self,
        _jobs: Vec<oxana::BatchItem<WorkerNoopJob>>,
    ) -> Result<(), WorkerError> {
        Ok(())
    }
}

impl oxana::FromContext<()> for WorkerNoop {
    fn from_context(_ctx: &()) -> Self {
        Self
    }
}

impl oxana::Job for WorkerNoopJob {
    fn worker_name() -> &'static str {
        std::any::type_name::<WorkerNoop>()
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerRedisSetJob {
    pub key: String,
    pub value: String,
}

pub struct WorkerRedisSet {
    pub state: WorkerState,
}

#[async_trait::async_trait]
impl oxana::Worker<WorkerRedisSetJob> for WorkerRedisSet {
    type Error = WorkerError;

    async fn run_batch(
        &self,
        jobs: Vec<oxana::BatchItem<WorkerRedisSetJob>>,
    ) -> Result<(), WorkerError> {
        let mut redis = self.state.redis.get().await?;
        for item in jobs {
            let job = item.job;
            let _: () = redis.set_ex(&job.key, job.value, 3).await?;
        }
        Ok(())
    }
}

impl oxana::FromContext<WorkerState> for WorkerRedisSet {
    fn from_context(ctx: &WorkerState) -> Self {
        Self { state: ctx.clone() }
    }
}

impl oxana::Job for WorkerRedisSetJob {
    fn worker_name() -> &'static str {
        std::any::type_name::<WorkerRedisSet>()
    }
}

#[derive(Serialize)]
pub struct QueueOne;

impl oxana::Queue for QueueOne {
    fn to_config() -> oxana::QueueConfig {
        oxana::QueueConfig::as_static("one")
    }
}

pub fn setup() -> deadpool_redis::Pool {
    dotenvy::from_filename(".env.test").ok();

    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .try_init()
        .ok();

    redis_pool()
}

pub fn redis_pool() -> deadpool_redis::Pool {
    let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL is not set");
    let mut cfg = deadpool_redis::Config::from_url(redis_url);
    cfg.pool = Some(deadpool_redis::PoolConfig {
        max_size: 10,
        timeouts: deadpool_redis::Timeouts {
            wait: Some(std::time::Duration::from_millis(50)),
            create: Some(std::time::Duration::from_millis(50)),
            recycle: Some(std::time::Duration::from_millis(50)),
        },
        ..Default::default()
    });
    cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))
        .expect("Failed to create Redis pool")
}

pub fn random_string() -> String {
    Alphanumeric.sample_string(&mut rand::rng(), 16)
}