use std::{
fmt::Display,
sync::{Arc},
time::{Duration, Instant},
thread,
marker::PhantomData,
};
use async_trait::async_trait;
use crate::metrics;
#[derive(Debug, Clone, PartialEq)]
pub enum TaskStatus {
Completed,
Failed(String),
}
impl From<&TaskStatus> for &'static str {
fn from(status: &TaskStatus) -> Self {
match status {
TaskStatus::Completed => "completed",
TaskStatus::Failed(_) => "failed",
}
}
}
#[derive(Debug)]
pub enum ProcessorError {
TaskFailed(String),
InternalError(String),
}
impl Display for ProcessorError {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
match self {
ProcessorError::TaskFailed(msg) => {
write!(f, "任务执行失败: {msg}")
},
ProcessorError::InternalError(msg) => {
write!(f, "内部错误: {msg}")
},
}
}
}
impl std::error::Error for ProcessorError {}
#[derive(Debug)]
pub struct TaskResult<T, O>
where
T: Send + Sync,
O: Send + Sync,
{
pub status: TaskStatus,
pub task: Option<T>,
pub output: Option<O>,
pub error: Option<String>,
pub processing_time: Duration,
}
#[async_trait]
pub trait TaskProcessor<T, O>: Send + Sync + 'static
where
T: Clone + Send + Sync + 'static,
O: Clone + Send + Sync + 'static,
{
async fn process(
&self,
task: T,
) -> Result<O, ProcessorError>;
}
pub struct SyncProcessor<T, O, P>
where
T: Clone + Send + Sync + 'static,
O: Clone + Send + Sync + 'static,
P: TaskProcessor<T, O>,
{
processor: Arc<P>,
max_retries: u32,
retry_delay: Duration,
_phantom: PhantomData<(T, O)>,
}
impl<T, O, P> SyncProcessor<T, O, P>
where
T: Clone + Send + Sync + 'static,
O: Clone + Send + Sync + 'static,
P: TaskProcessor<T, O>,
{
pub fn new(
processor: P,
max_retries: u32,
retry_delay: Duration,
) -> Self {
Self {
processor: Arc::new(processor),
max_retries,
retry_delay,
_phantom: PhantomData,
}
}
pub async fn process_task(
&self,
task: T,
) -> TaskResult<T, O> {
metrics::task_submitted();
let start_time = Instant::now();
let mut current_retry = 0;
loop {
match self.processor.process(task.clone()).await {
Ok(output) => {
let result = TaskResult {
status: TaskStatus::Completed,
task: Some(task),
output: Some(output),
error: None,
processing_time: start_time.elapsed(),
};
metrics::task_processing_duration(result.processing_time);
metrics::task_processed((&result.status).into());
return result;
},
Err(e) => {
if current_retry < self.max_retries {
current_retry += 1;
metrics::task_retried();
tokio::time::sleep(self.retry_delay).await;
continue;
}
let result = TaskResult {
status: TaskStatus::Failed(e.to_string()),
task: Some(task),
output: None,
error: Some(e.to_string()),
processing_time: start_time.elapsed(),
};
metrics::task_processing_duration(result.processing_time);
metrics::task_processed((&result.status).into());
return result;
},
}
}
}
pub async fn process_task_with_retry(
&self,
task: T,
max_retries: u32,
retry_delay: Duration,
) -> TaskResult<T, O> {
metrics::task_submitted();
let start_time = Instant::now();
let mut current_retry = 0;
loop {
match self.processor.process(task.clone()).await {
Ok(output) => {
let result = TaskResult {
status: TaskStatus::Completed,
task: Some(task),
output: Some(output),
error: None,
processing_time: start_time.elapsed(),
};
metrics::task_processing_duration(result.processing_time);
metrics::task_processed((&result.status).into());
return result;
},
Err(e) => {
if current_retry < max_retries {
current_retry += 1;
metrics::task_retried();
thread::sleep(retry_delay);
continue;
}
let result = TaskResult {
status: TaskStatus::Failed(e.to_string()),
task: Some(task),
output: None,
error: Some(e.to_string()),
processing_time: start_time.elapsed(),
};
metrics::task_processing_duration(result.processing_time);
metrics::task_processed((&result.status).into());
return result;
},
}
}
}
}