1use std::{
2 fmt::Display,
3 sync::{Arc},
4 time::{Duration, Instant},
5 thread,
6 marker::PhantomData,
7};
8
9use async_trait::async_trait;
10
11use crate::metrics;
12
13#[derive(Debug, Clone, PartialEq)]
15pub enum TaskStatus {
16 Completed,
17 Failed(String),
18}
19
20impl From<&TaskStatus> for &'static str {
21 fn from(status: &TaskStatus) -> Self {
22 match status {
23 TaskStatus::Completed => "completed",
24 TaskStatus::Failed(_) => "failed",
25 }
26 }
27}
28
29#[derive(Debug)]
31pub enum ProcessorError {
32 TaskFailed(String),
33 InternalError(String),
34}
35
36impl Display for ProcessorError {
37 fn fmt(
38 &self,
39 f: &mut std::fmt::Formatter<'_>,
40 ) -> std::fmt::Result {
41 match self {
42 ProcessorError::TaskFailed(msg) => {
43 write!(f, "任务执行失败: {}", msg)
44 },
45 ProcessorError::InternalError(msg) => {
46 write!(f, "内部错误: {}", msg)
47 },
48 }
49 }
50}
51
52impl std::error::Error for ProcessorError {}
53
54#[derive(Debug)]
56pub struct TaskResult<T, O>
57where
58 T: Send + Sync,
59 O: Send + Sync,
60{
61 pub status: TaskStatus,
62 pub task: Option<T>,
63 pub output: Option<O>,
64 pub error: Option<String>,
65 pub processing_time: Duration,
66}
67
68#[async_trait]
70pub trait TaskProcessor<T, O>: Send + Sync + 'static
71where
72 T: Clone + Send + Sync + 'static,
73 O: Clone + Send + Sync + 'static,
74{
75 async fn process(
76 &self,
77 task: T,
78 ) -> Result<O, ProcessorError>;
79}
80
81pub struct SyncProcessor<T, O, P>
83where
84 T: Clone + Send + Sync + 'static,
85 O: Clone + Send + Sync + 'static,
86 P: TaskProcessor<T, O>,
87{
88 processor: Arc<P>,
89 max_retries: u32,
90 retry_delay: Duration,
91 _phantom: PhantomData<(T, O)>,
92}
93
94impl<T, O, P> SyncProcessor<T, O, P>
95where
96 T: Clone + Send + Sync + 'static,
97 O: Clone + Send + Sync + 'static,
98 P: TaskProcessor<T, O>,
99{
100 pub fn new(
101 processor: P,
102 max_retries: u32,
103 retry_delay: Duration,
104 ) -> Self {
105 Self {
106 processor: Arc::new(processor),
107 max_retries,
108 retry_delay,
109 _phantom: PhantomData,
110 }
111 }
112
113 pub async fn process_task(
114 &self,
115 task: T,
116 ) -> TaskResult<T, O> {
117 metrics::task_submitted();
118 let start_time = Instant::now();
119 let mut current_retry = 0;
120
121 loop {
122 match self.processor.process(task.clone()).await {
123 Ok(output) => {
124 let result = TaskResult {
125 status: TaskStatus::Completed,
126 task: Some(task),
127 output: Some(output),
128 error: None,
129 processing_time: start_time.elapsed(),
130 };
131 metrics::task_processing_duration(result.processing_time);
132 metrics::task_processed((&result.status).into());
133 return result;
134 },
135 Err(e) => {
136 if current_retry < self.max_retries {
137 current_retry += 1;
138 metrics::task_retried();
139 thread::sleep(self.retry_delay);
140 continue;
141 }
142 let result = TaskResult {
143 status: TaskStatus::Failed(e.to_string()),
144 task: Some(task),
145 output: None,
146 error: Some(e.to_string()),
147 processing_time: start_time.elapsed(),
148 };
149 metrics::task_processing_duration(result.processing_time);
150 metrics::task_processed((&result.status).into());
151 return result;
152 },
153 }
154 }
155 }
156
157 pub async fn process_task_with_retry(
158 &self,
159 task: T,
160 max_retries: u32,
161 retry_delay: Duration,
162 ) -> TaskResult<T, O> {
163 metrics::task_submitted();
164 let start_time = Instant::now();
165 let mut current_retry = 0;
166
167 loop {
168 match self.processor.process(task.clone()).await {
169 Ok(output) => {
170 let result = TaskResult {
171 status: TaskStatus::Completed,
172 task: Some(task),
173 output: Some(output),
174 error: None,
175 processing_time: start_time.elapsed(),
176 };
177 metrics::task_processing_duration(result.processing_time);
178 metrics::task_processed((&result.status).into());
179 return result;
180 },
181 Err(e) => {
182 if current_retry < max_retries {
183 current_retry += 1;
184 metrics::task_retried();
185 thread::sleep(retry_delay);
186 continue;
187 }
188 let result = TaskResult {
189 status: TaskStatus::Failed(e.to_string()),
190 task: Some(task),
191 output: None,
192 error: Some(e.to_string()),
193 processing_time: start_time.elapsed(),
194 };
195 metrics::task_processing_duration(result.processing_time);
196 metrics::task_processed((&result.status).into());
197 return result;
198 },
199 }
200 }
201 }
202}