oxana 2.0.0-rc.3

A simple & fast job queue system.
Documentation
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;

use crate::error::OxanaError;
use crate::queue::{QueueConfig, QueueThrottle};
use crate::semaphores_map::SemaphoresMap;
use crate::storage_internal::StorageInternal;
use crate::throttler::Throttler;
use crate::worker_event::WorkerJob;
use crate::{Config, JobId};

pub async fn run<DT, ET>(
    config: Arc<Config<DT, ET>>,
    queue_config: QueueConfig,
    queue_key: String,
    job_tx: mpsc::Sender<WorkerJob>,
    semaphores: Arc<SemaphoresMap>,
) -> Result<(), OxanaError>
where
    DT: Send + Sync + Clone + 'static,
    ET: std::error::Error + Send + Sync + 'static,
{
    loop {
        let semaphore = semaphores.get_or_create(queue_key.clone()).await;
        let permit = semaphore.acquire_owned().await.unwrap();

        tokio::select! {
            result = pop_queue_message(&config.storage.internal, &queue_config, &queue_key) => {
                match config.storage.internal.track_redis_result(result)? {
                    Some(job_id) => {
                        let job = WorkerJob { job_id, permit };
                        job_tx
                            .send(job)
                            .await
                            .expect("Failed to send job to worker");
                    }
                    None => {
                        drop(permit);
                        sleep(Duration::from_secs(1)).await;
                    }
                }
            }
            _ = config.cancel_token.cancelled() => {
                tracing::debug!("Stopping dispatcher for queue {}", queue_key);
                drop(permit);
                break;
            }
        }
    }

    Ok(())
}

async fn pop_queue_message(
    storage: &StorageInternal,
    queue_config: &QueueConfig,
    queue_key: &str,
) -> Result<String, OxanaError> {
    match &queue_config.throttle {
        Some(throttle) => pop_queue_message_w_throttle(storage, queue_key, throttle).await,
        None => pop_queue_message_wo_throttle(storage, queue_key, 10.0).await,
    }
}

async fn pop_queue_message_wo_throttle(
    storage: &StorageInternal,
    queue_key: &str,
    timeout: f64,
) -> Result<String, OxanaError> {
    loop {
        if let Some(job_id) = storage.blocking_dequeue(queue_key, timeout).await? {
            return Ok(job_id);
        }
    }
}

async fn pop_queue_message_w_throttle(
    storage: &StorageInternal,
    queue_key: &str,
    throttle: &QueueThrottle,
) -> Result<JobId, OxanaError> {
    let pool = storage.pool().await?;
    loop {
        let throttler = Throttler::new(pool.clone(), queue_key, throttle.limit, throttle.window_ms);

        let state = throttler.state().await?;

        if state.is_allowed
            && let Some(job_id) = storage.dequeue(queue_key).await?
        {
            let cost = storage
                .get_job(&job_id)
                .await?
                .and_then(|envelope| envelope.meta.throttle_cost);
            throttler.consume(cost).await?;
            return Ok(job_id);
        }

        sleep(Duration::from_millis(
            u64::try_from(state.throttled_for.unwrap_or(100)).unwrap_or(100),
        ))
        .await;
    }
}