use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use crate::error::OxanaError;
use crate::queue::{QueueConfig, QueueThrottle};
use crate::semaphores_map::SemaphoresMap;
use crate::storage_internal::StorageInternal;
use crate::throttler::Throttler;
use crate::worker_event::WorkerJob;
use crate::{Config, JobId};
pub async fn run<DT, ET>(
config: Arc<Config<DT, ET>>,
queue_config: QueueConfig,
queue_key: String,
job_tx: mpsc::Sender<WorkerJob>,
semaphores: Arc<SemaphoresMap>,
) -> Result<(), OxanaError>
where
DT: Send + Sync + Clone + 'static,
ET: std::error::Error + Send + Sync + 'static,
{
loop {
let semaphore = semaphores.get_or_create(queue_key.clone()).await;
let permit = semaphore.acquire_owned().await.unwrap();
tokio::select! {
result = pop_queue_message(&config.storage.internal, &queue_config, &queue_key) => {
match config.storage.internal.track_redis_result(result)? {
Some(job_id) => {
let job = WorkerJob { job_id, permit };
job_tx
.send(job)
.await
.expect("Failed to send job to worker");
}
None => {
drop(permit);
sleep(Duration::from_secs(1)).await;
}
}
}
_ = config.cancel_token.cancelled() => {
tracing::debug!("Stopping dispatcher for queue {}", queue_key);
drop(permit);
break;
}
}
}
Ok(())
}
async fn pop_queue_message(
storage: &StorageInternal,
queue_config: &QueueConfig,
queue_key: &str,
) -> Result<String, OxanaError> {
match &queue_config.throttle {
Some(throttle) => pop_queue_message_w_throttle(storage, queue_key, throttle).await,
None => pop_queue_message_wo_throttle(storage, queue_key, 10.0).await,
}
}
async fn pop_queue_message_wo_throttle(
storage: &StorageInternal,
queue_key: &str,
timeout: f64,
) -> Result<String, OxanaError> {
loop {
if let Some(job_id) = storage.blocking_dequeue(queue_key, timeout).await? {
return Ok(job_id);
}
}
}
async fn pop_queue_message_w_throttle(
storage: &StorageInternal,
queue_key: &str,
throttle: &QueueThrottle,
) -> Result<JobId, OxanaError> {
let pool = storage.pool().await?;
loop {
let throttler = Throttler::new(pool.clone(), queue_key, throttle.limit, throttle.window_ms);
let state = throttler.state().await?;
if state.is_allowed
&& let Some(job_id) = storage.dequeue(queue_key).await?
{
let cost = storage
.get_job(&job_id)
.await?
.and_then(|envelope| envelope.meta.throttle_cost);
throttler.consume(cost).await?;
return Ok(job_id);
}
sleep(Duration::from_millis(
u64::try_from(state.throttled_for.unwrap_or(100)).unwrap_or(100),
))
.await;
}
}