oxana 2.0.0-rc.5

A simple & fast job queue system.
Documentation
use deadpool_redis::redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use testresult::TestResult;

use crate::shared::*;

#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerUniqueSkipJob {
    pub id: i32,
    pub key: String,
    pub value: i32,
}

pub struct WorkerUniqueSkip {
    state: WorkerState,
}

impl oxana::Job for WorkerUniqueSkipJob {
    fn worker_name() -> &'static str {
        std::any::type_name::<WorkerUniqueSkip>()
    }
    fn unique_id(&self) -> Option<String> {
        Some(format!("unique:{}", self.id))
    }
    fn on_conflict(&self) -> oxana::JobConflictStrategy {
        oxana::JobConflictStrategy::Skip
    }
}

impl oxana::FromContext<WorkerState> for WorkerUniqueSkip {
    fn from_context(ctx: &WorkerState) -> Self {
        Self { state: ctx.clone() }
    }
}

#[async_trait::async_trait]
impl oxana::Worker<WorkerUniqueSkipJob> for WorkerUniqueSkip {
    type Error = WorkerError;

    async fn run_batch(
        &self,
        jobs: Vec<oxana::BatchItem<WorkerUniqueSkipJob>>,
    ) -> Result<(), WorkerError> {
        let mut redis = self.state.redis.get().await?;
        for item in jobs {
            let job = item.job;
            let _: () = redis.set_ex(&job.key, job.value.to_string(), 3).await?;
        }
        Ok(())
    }

    fn retry_delay(&self, _job: &WorkerUniqueSkipJob, _retries: u32) -> u64 {
        0
    }
    fn max_retries(&self, _job: &WorkerUniqueSkipJob) -> u32 {
        0
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerUniqueReplaceJob {
    pub id: i32,
    pub key: String,
    pub value: i32,
}

pub struct WorkerUniqueReplace {
    state: WorkerState,
}

impl oxana::Job for WorkerUniqueReplaceJob {
    fn worker_name() -> &'static str {
        std::any::type_name::<WorkerUniqueReplace>()
    }
    fn unique_id(&self) -> Option<String> {
        Some(format!("unique:{}", self.id))
    }
    fn on_conflict(&self) -> oxana::JobConflictStrategy {
        oxana::JobConflictStrategy::Replace
    }
}

impl oxana::FromContext<WorkerState> for WorkerUniqueReplace {
    fn from_context(ctx: &WorkerState) -> Self {
        Self { state: ctx.clone() }
    }
}

#[async_trait::async_trait]
impl oxana::Worker<WorkerUniqueReplaceJob> for WorkerUniqueReplace {
    type Error = WorkerError;

    async fn run_batch(
        &self,
        jobs: Vec<oxana::BatchItem<WorkerUniqueReplaceJob>>,
    ) -> Result<(), WorkerError> {
        let mut redis = self.state.redis.get().await?;
        for item in jobs {
            let job = item.job;
            let _: () = redis.set_ex(&job.key, job.value.to_string(), 3).await?;
        }
        Ok(())
    }

    fn retry_delay(&self, _job: &WorkerUniqueReplaceJob, _retries: u32) -> u64 {
        0
    }
    fn max_retries(&self, _job: &WorkerUniqueReplaceJob) -> u32 {
        0
    }
}

#[tokio::test]
pub async fn test_unique_skip() -> TestResult {
    let redis_pool = setup();
    let mut redis_conn = redis_pool.get().await?;
    let ctx = oxana::ContextValue::new(WorkerState {
        redis: redis_pool.clone(),
    });
    let storage = oxana::Storage::builder()
        .namespace(random_string())
        .build_from_pool(redis_pool.clone())?;
    let config = oxana::Config::new(&storage)
        .register_queue::<QueueOne>()
        .register_worker::<WorkerUniqueSkip, WorkerUniqueSkipJob>()
        .exit_when_processed(2);
    let key1 = random_string();
    let key2 = random_string();

    storage
        .enqueue(
            QueueOne,
            WorkerUniqueSkipJob {
                id: 1,
                key: key1.clone(),
                value: 1,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueSkipJob {
                id: 1,
                key: key1.clone(),
                value: 2,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueSkipJob {
                id: 2,
                key: key2.clone(),
                value: 3,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueSkipJob {
                id: 2,
                key: key2.clone(),
                value: 4,
            },
        )
        .await?;

    assert_eq!(storage.enqueued_count(QueueOne).await?, 2);

    oxana::run(config, ctx).await?;

    assert_eq!(storage.dead_count().await?, 0);
    assert_eq!(storage.enqueued_count(QueueOne).await?, 0);
    assert_eq!(storage.jobs_count().await?, 0);

    let value: Option<i32> = redis_conn.get(key1).await?;
    assert_eq!(value, Some(1));
    let value: Option<i32> = redis_conn.get(key2).await?;
    assert_eq!(value, Some(3));

    Ok(())
}

#[tokio::test]
pub async fn test_unique_replace() -> TestResult {
    let redis_pool = setup();
    let mut redis_conn = redis_pool.get().await?;
    let ctx = oxana::ContextValue::new(WorkerState {
        redis: redis_pool.clone(),
    });
    let storage = oxana::Storage::builder()
        .namespace(random_string())
        .build_from_pool(redis_pool)?;
    let config = oxana::Config::new(&storage)
        .register_queue::<QueueOne>()
        .register_worker::<WorkerUniqueReplace, WorkerUniqueReplaceJob>()
        .exit_when_processed(2);

    let key1 = random_string();
    let key2 = random_string();

    storage
        .enqueue(
            QueueOne,
            WorkerUniqueReplaceJob {
                id: 1,
                key: key1.clone(),
                value: 1,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueReplaceJob {
                id: 1,
                key: key1.clone(),
                value: 2,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueReplaceJob {
                id: 2,
                key: key2.clone(),
                value: 3,
            },
        )
        .await?;
    storage
        .enqueue(
            QueueOne,
            WorkerUniqueReplaceJob {
                id: 2,
                key: key2.clone(),
                value: 4,
            },
        )
        .await?;

    assert_eq!(storage.enqueued_count(QueueOne).await?, 2);

    oxana::run(config, ctx).await?;

    assert_eq!(storage.dead_count().await?, 0);
    assert_eq!(storage.enqueued_count(QueueOne).await?, 0);
    assert_eq!(storage.jobs_count().await?, 0);

    let value: Option<i32> = redis_conn.get(key1).await?;
    assert_eq!(value, Some(2));
    let value: Option<i32> = redis_conn.get(key2).await?;
    assert_eq!(value, Some(4));

    Ok(())
}