use crate::advanced::{CircuitBreaker, RetryPolicy};
use crate::context::Context;
use crate::error::TaskError;
use crate::metrics::MetricsCollector;
use crate::tasks::{Task, TaskOutput};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
#[derive(Debug)]
pub struct TaskResult {
pub id: String,
pub attempts: u32,
pub outcome: Result<TaskOutput, TaskError>,
}
pub struct WorkerPool {
semaphore: Arc<Semaphore>,
retry: RetryPolicy,
timeout: Option<Duration>,
metrics: Arc<MetricsCollector>,
}
impl WorkerPool {
pub fn new(
concurrency: usize,
retry: RetryPolicy,
timeout: Option<Duration>,
metrics: Arc<MetricsCollector>,
) -> Self {
WorkerPool {
semaphore: Arc::new(Semaphore::new(concurrency.max(1))),
retry,
timeout,
metrics,
}
}
pub fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
pub fn spawn(
&self,
task: Arc<dyn Task>,
ctx: Arc<Context>,
breaker: Option<Arc<CircuitBreaker>>,
) -> JoinHandle<TaskResult> {
let semaphore = self.semaphore.clone();
let retry = self.retry;
let timeout = self.timeout;
let metrics = self.metrics.clone();
let id = task.id().to_string();
tokio::spawn(async move {
let _permit = semaphore
.acquire_owned()
.await
.expect("semaphore is never closed");
metrics.task_started();
let mut attempts = 0u32;
loop {
if ctx.is_cancelled() {
return TaskResult {
id,
attempts,
outcome: Err(TaskError::Cancelled),
};
}
if let Some(ref b) = breaker {
if !b.allow_request() {
return TaskResult {
id: id.clone(),
attempts,
outcome: Err(TaskError::CircuitOpen(id.clone())),
};
}
}
attempts += 1;
let result = run_once(task.clone(), ctx.clone(), timeout).await;
match result {
Ok(output) => {
if let Some(ref b) = breaker {
b.record_success();
}
return TaskResult {
id,
attempts,
outcome: Ok(output),
};
}
Err(err) => {
if let Some(ref b) = breaker {
b.record_failure();
}
let retryable = err.is_retryable()
&& retry.should_retry(attempts)
&& !ctx.is_cancelled();
if !retryable {
return TaskResult {
id,
attempts,
outcome: Err(err),
};
}
metrics.retry();
let delay = retry.delay_for(attempts);
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}
}
}
})
}
}
async fn run_once(
task: Arc<dyn Task>,
ctx: Arc<Context>,
timeout: Option<Duration>,
) -> Result<TaskOutput, TaskError> {
match timeout {
Some(t) => match tokio::time::timeout(t, task.execute(ctx)).await {
Ok(res) => res,
Err(_) => Err(TaskError::Timeout(t)),
},
None => task.execute(ctx).await,
}
}