1use std::{
2 fmt::Display,
3 sync::{Arc},
4 time::{Duration, Instant},
5 thread,
6 marker::PhantomData,
7};
8
9use crate::metrics;
10
11#[derive(Debug, Clone, PartialEq)]
13pub enum TaskStatus {
14 Completed,
15 Failed(String),
16}
17
18impl From<&TaskStatus> for &'static str {
19 fn from(status: &TaskStatus) -> Self {
20 match status {
21 TaskStatus::Completed => "completed",
22 TaskStatus::Failed(_) => "failed",
23 }
24 }
25}
26
27#[derive(Debug)]
29pub enum ProcessorError {
30 TaskFailed(String),
31 InternalError(String),
32}
33
34impl Display for ProcessorError {
35 fn fmt(
36 &self,
37 f: &mut std::fmt::Formatter<'_>,
38 ) -> std::fmt::Result {
39 match self {
40 ProcessorError::TaskFailed(msg) => {
41 write!(f, "任务执行失败: {}", msg)
42 },
43 ProcessorError::InternalError(msg) => {
44 write!(f, "内部错误: {}", msg)
45 },
46 }
47 }
48}
49
50impl std::error::Error for ProcessorError {}
51
52#[derive(Debug)]
54pub struct TaskResult<T, O>
55where
56 T: Send + Sync,
57 O: Send + Sync,
58{
59 pub status: TaskStatus,
60 pub task: Option<T>,
61 pub output: Option<O>,
62 pub error: Option<String>,
63 pub processing_time: Duration,
64}
65
66pub trait TaskProcessor<T, O>: Send + Sync + 'static
68where
69 T: Clone + Send + Sync + 'static,
70 O: Clone + Send + Sync + 'static,
71{
72 fn process(
73 &self,
74 task: T,
75 ) -> Result<O, ProcessorError>;
76}
77
78pub struct SyncProcessor<T, O, P>
80where
81 T: Clone + Send + Sync + 'static,
82 O: Clone + Send + Sync + 'static,
83 P: TaskProcessor<T, O>,
84{
85 processor: Arc<P>,
86 max_retries: u32,
87 retry_delay: Duration,
88 _phantom: PhantomData<(T, O)>,
89}
90
91impl<T, O, P> SyncProcessor<T, O, P>
92where
93 T: Clone + Send + Sync + 'static,
94 O: Clone + Send + Sync + 'static,
95 P: TaskProcessor<T, O>,
96{
97 pub fn new(
98 processor: P,
99 max_retries: u32,
100 retry_delay: Duration,
101 ) -> Self {
102 Self {
103 processor: Arc::new(processor),
104 max_retries,
105 retry_delay,
106 _phantom: PhantomData,
107 }
108 }
109
110 pub fn process_task(
111 &self,
112 task: T,
113 ) -> TaskResult<T, O> {
114 metrics::task_submitted();
115 let start_time = Instant::now();
116 let mut current_retry = 0;
117
118 loop {
119 match self.processor.process(task.clone()) {
120 Ok(output) => {
121 let result = TaskResult {
122 status: TaskStatus::Completed,
123 task: Some(task),
124 output: Some(output),
125 error: None,
126 processing_time: start_time.elapsed(),
127 };
128 metrics::task_processing_duration(result.processing_time);
129 metrics::task_processed((&result.status).into());
130 return result;
131 },
132 Err(e) => {
133 if current_retry < self.max_retries {
134 current_retry += 1;
135 metrics::task_retried();
136 thread::sleep(self.retry_delay);
137 continue;
138 }
139 let result = TaskResult {
140 status: TaskStatus::Failed(e.to_string()),
141 task: Some(task),
142 output: None,
143 error: Some(e.to_string()),
144 processing_time: start_time.elapsed(),
145 };
146 metrics::task_processing_duration(result.processing_time);
147 metrics::task_processed((&result.status).into());
148 return result;
149 },
150 }
151 }
152 }
153
154 pub fn process_task_with_retry(
155 &self,
156 task: T,
157 max_retries: u32,
158 retry_delay: Duration,
159 ) -> TaskResult<T, O> {
160 metrics::task_submitted();
161 let start_time = Instant::now();
162 let mut current_retry = 0;
163
164 loop {
165 match self.processor.process(task.clone()) {
166 Ok(output) => {
167 let result = TaskResult {
168 status: TaskStatus::Completed,
169 task: Some(task),
170 output: Some(output),
171 error: None,
172 processing_time: start_time.elapsed(),
173 };
174 metrics::task_processing_duration(result.processing_time);
175 metrics::task_processed((&result.status).into());
176 return result;
177 },
178 Err(e) => {
179 if current_retry < max_retries {
180 current_retry += 1;
181 metrics::task_retried();
182 thread::sleep(retry_delay);
183 continue;
184 }
185 let result = TaskResult {
186 status: TaskStatus::Failed(e.to_string()),
187 task: Some(task),
188 output: None,
189 error: Some(e.to_string()),
190 processing_time: start_time.elapsed(),
191 };
192 metrics::task_processing_duration(result.processing_time);
193 metrics::task_processed((&result.status).into());
194 return result;
195 },
196 }
197 }
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 struct TestProcessor;
206
207 impl TaskProcessor<i32, String> for TestProcessor {
208 fn process(
209 &self,
210 task: i32,
211 ) -> Result<String, ProcessorError> {
212 if task < 0 {
213 return Err(ProcessorError::TaskFailed("负数任务".to_string()));
214 }
215 thread::sleep(Duration::from_millis(100));
216 Ok(format!("Processed: {}", task))
217 }
218 }
219
220 #[test]
221 fn test_sync_processor() {
222 let processor =
223 SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
224
225 let result = processor.process_task(42);
227 assert_eq!(result.status, TaskStatus::Completed);
228 assert!(result.error.is_none());
229 assert_eq!(result.output, Some("Processed: 42".to_string()));
230
231 let result = processor.process_task(-1);
233 assert_eq!(
234 result.status,
235 TaskStatus::Failed("任务执行失败: 负数任务".to_string())
236 );
237 assert!(result.output.is_none());
238 assert!(result.error.is_some());
239 }
240
241 #[test]
242 fn test_processor_with_retry() {
243 let processor =
244 SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
245
246 let result =
248 processor.process_task_with_retry(42, 2, Duration::from_millis(50));
249 assert_eq!(result.status, TaskStatus::Completed);
250 assert!(result.error.is_none());
251 assert_eq!(result.output, Some("Processed: 42".to_string()));
252 }
253}