1use std::{
2 fmt::Display,
3 sync::{Arc},
4 time::{Duration, Instant},
5 thread,
6 marker::PhantomData,
7};
8
9#[derive(Debug, Clone, PartialEq)]
11pub enum TaskStatus {
12 Completed,
13 Failed(String),
14}
15
16#[derive(Debug)]
18pub enum ProcessorError {
19 TaskFailed(String),
20 InternalError(String),
21}
22
23impl Display for ProcessorError {
24 fn fmt(
25 &self,
26 f: &mut std::fmt::Formatter<'_>,
27 ) -> std::fmt::Result {
28 match self {
29 ProcessorError::TaskFailed(msg) => {
30 write!(f, "任务执行失败: {}", msg)
31 },
32 ProcessorError::InternalError(msg) => {
33 write!(f, "内部错误: {}", msg)
34 },
35 }
36 }
37}
38
39impl std::error::Error for ProcessorError {}
40
41#[derive(Debug)]
43pub struct TaskResult<T, O>
44where
45 T: Send + Sync,
46 O: Send + Sync,
47{
48 pub status: TaskStatus,
49 pub task: Option<T>,
50 pub output: Option<O>,
51 pub error: Option<String>,
52 pub processing_time: Duration,
53}
54
55pub trait TaskProcessor<T, O>: Send + Sync + 'static
57where
58 T: Clone + Send + Sync + 'static,
59 O: Clone + Send + Sync + 'static,
60{
61 fn process(
62 &self,
63 task: T,
64 ) -> Result<O, ProcessorError>;
65}
66
67pub struct SyncProcessor<T, O, P>
69where
70 T: Clone + Send + Sync + 'static,
71 O: Clone + Send + Sync + 'static,
72 P: TaskProcessor<T, O>,
73{
74 processor: Arc<P>,
75 max_retries: u32,
76 retry_delay: Duration,
77 _phantom: PhantomData<(T, O)>,
78}
79
80impl<T, O, P> SyncProcessor<T, O, P>
81where
82 T: Clone + Send + Sync + 'static,
83 O: Clone + Send + Sync + 'static,
84 P: TaskProcessor<T, O>,
85{
86 pub fn new(
87 processor: P,
88 max_retries: u32,
89 retry_delay: Duration,
90 ) -> Self {
91 Self {
92 processor: Arc::new(processor),
93 max_retries,
94 retry_delay,
95 _phantom: PhantomData,
96 }
97 }
98
99 pub fn process_task(
100 &self,
101 task: T,
102 ) -> TaskResult<T, O> {
103 let start_time = Instant::now();
104 let mut current_retry = 0;
105
106 loop {
107 match self.processor.process(task.clone()) {
108 Ok(output) => {
109 return TaskResult {
110 status: TaskStatus::Completed,
111 task: Some(task),
112 output: Some(output),
113 error: None,
114 processing_time: start_time.elapsed(),
115 };
116 },
117 Err(e) => {
118 if current_retry < self.max_retries {
119 current_retry += 1;
120 thread::sleep(self.retry_delay);
121 continue;
122 }
123 return TaskResult {
124 status: TaskStatus::Failed(e.to_string()),
125 task: Some(task),
126 output: None,
127 error: Some(e.to_string()),
128 processing_time: start_time.elapsed(),
129 };
130 },
131 }
132 }
133 }
134
135 pub fn process_task_with_retry(
136 &self,
137 task: T,
138 max_retries: u32,
139 retry_delay: Duration,
140 ) -> TaskResult<T, O> {
141 let start_time = Instant::now();
142 let mut current_retry = 0;
143
144 loop {
145 match self.processor.process(task.clone()) {
146 Ok(output) => {
147 return TaskResult {
148 status: TaskStatus::Completed,
149 task: Some(task),
150 output: Some(output),
151 error: None,
152 processing_time: start_time.elapsed(),
153 };
154 },
155 Err(e) => {
156 if current_retry < max_retries {
157 current_retry += 1;
158 thread::sleep(retry_delay);
159 continue;
160 }
161 return TaskResult {
162 status: TaskStatus::Failed(e.to_string()),
163 task: Some(task),
164 output: None,
165 error: Some(e.to_string()),
166 processing_time: start_time.elapsed(),
167 };
168 },
169 }
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 struct TestProcessor;
179
180 impl TaskProcessor<i32, String> for TestProcessor {
181 fn process(
182 &self,
183 task: i32,
184 ) -> Result<String, ProcessorError> {
185 if task < 0 {
186 return Err(ProcessorError::TaskFailed("负数任务".to_string()));
187 }
188 thread::sleep(Duration::from_millis(100));
189 Ok(format!("Processed: {}", task))
190 }
191 }
192
193 #[test]
194 fn test_sync_processor() {
195 let processor =
196 SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
197
198 let result = processor.process_task(42);
200 assert_eq!(result.status, TaskStatus::Completed);
201 assert!(result.error.is_none());
202 assert_eq!(result.output, Some("Processed: 42".to_string()));
203
204 let result = processor.process_task(-1);
206 assert_eq!(
207 result.status,
208 TaskStatus::Failed("任务执行失败: 负数任务".to_string())
209 );
210 assert!(result.output.is_none());
211 assert!(result.error.is_some());
212 }
213
214 #[test]
215 fn test_processor_with_retry() {
216 let processor =
217 SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
218
219 let result =
221 processor.process_task_with_retry(42, 2, Duration::from_millis(50));
222 assert_eq!(result.status, TaskStatus::Completed);
223 assert!(result.error.is_none());
224 assert_eq!(result.output, Some("Processed: 42".to_string()));
225 }
226}